Skip to content

[SPARK-15543][SQL] Rename DefaultSources to make them more self-describing #13311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1 @@
org.apache.spark.ml.source.libsvm.DefaultSource
org.apache.spark.ml.source.libsvm.LibSVMFileFormat
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[libsvm] class LibSVMOutputWriter(
* .load("data/mllib/sample_libsvm_data.txt")
*
* // Java
* DataFrame df = spark.read().format("libsvm")
* Dataset<Row> df = spark.read().format("libsvm")
* .option("numFeatures, "780")
* .load("data/mllib/sample_libsvm_data.txt");
* }}}
Expand All @@ -105,9 +105,13 @@ private[libsvm] class LibSVMOutputWriter(
* - "vectorType": feature vector type, "sparse" (default) or "dense".
*
* @see [[https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/ LIBSVM datasets]]
*
* Note that this class is public for documentation purpose. Please don't use this class directly.
* Rather, use the data source API as illustrated above.
*/
// If this is moved or renamed, please update DataSource's backwardCompatibilityMap.
@Since("1.6.0")
class DefaultSource extends FileFormat with DataSourceRegister {
class LibSVMFileFormat extends FileFormat with DataSourceRegister {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this to backwardCompatibilityMap just to be safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it won't be type safe though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added it


@Since("1.6.0")
override def shortName(): String = "libsvm"
Expand Down
4 changes: 3 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ object MimaExcludes {
// SPARK-13664 Replace HadoopFsRelation with FileFormat
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.source.libsvm.LibSVMRelation"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelationProvider"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache")
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache"),
// SPARK-15543 Rename DefaultSources to make them more self-describing
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.source.libsvm.DefaultSource")
) ++ Seq(
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
org.apache.spark.sql.execution.datasources.csv.DefaultSource
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource
org.apache.spark.sql.execution.datasources.json.DefaultSource
org.apache.spark.sql.execution.datasources.parquet.DefaultSource
org.apache.spark.sql.execution.datasources.text.DefaultSource
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -74,15 +78,34 @@ case class DataSource(
lazy val sourceInfo = sourceSchema()

/** A map to maintain backward compatibility in case we move data sources around. */
private val backwardCompatibilityMap = Map(
"org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
"org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
"org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
"org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
"org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
"org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName,
"com.databricks.spark.csv" -> classOf[csv.DefaultSource].getCanonicalName
)
private val backwardCompatibilityMap: Map[String, String] = {
val jdbc = classOf[JdbcRelationProvider].getCanonicalName
val json = classOf[JsonFileFormat].getCanonicalName
val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
"org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
"org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
"org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
"org.apache.spark.sql.json" -> json,
"org.apache.spark.sql.json.DefaultSource" -> json,
"org.apache.spark.sql.execution.datasources.json" -> json,
"org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
"org.apache.spark.sql.parquet" -> parquet,
"org.apache.spark.sql.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.execution.datasources.parquet" -> parquet,
"org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
"org.apache.spark.sql.hive.orc" -> orc,
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv
)
}

/**
* Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
Expand Down Expand Up @@ -188,7 +211,7 @@ case class DataSource(
throw new IllegalArgumentException("'path' is not specified")
})
val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
val isTextSource = providingClass == classOf[text.DefaultSource]
val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified
if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -229,7 +252,7 @@ case class DataSource(
providingClass.newInstance() match {
case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns)

case parquet: parquet.DefaultSource =>
case parquet: parquet.ParquetFileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import org.apache.hadoop.mapreduce._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand All @@ -38,15 +36,15 @@ import org.apache.spark.util.SerializableConfiguration
/**
* Provides access to CSV data from pure SQL statements.
*/
class DefaultSource extends FileFormat with DataSourceRegister {
class CSVFileFormat extends FileFormat with DataSourceRegister {

override def shortName(): String = "csv"

override def toString: String = "CSV"

override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]

override def inferSchema(
sparkSession: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Properties
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}

class DefaultSource extends RelationProvider with DataSourceRegister {
class JdbcRelationProvider extends RelationProvider with DataSourceRegister {

override def shortName(): String = "jdbc"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

class DefaultSource extends FileFormat with DataSourceRegister {
class JsonFileFormat extends FileFormat with DataSourceRegister {

override def shortName(): String = "json"

Expand Down Expand Up @@ -151,7 +151,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {

override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]
}

private[json] class JsonOutputWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

private[sql] class DefaultSource
private[sql] class ParquetFileFormat
extends FileFormat
with DataSourceRegister
with Logging
Expand All @@ -62,7 +62,7 @@ private[sql] class DefaultSource

override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
override def equals(other: Any): Boolean = other.isInstanceOf[ParquetFileFormat]

override def prepareWrite(
sparkSession: SparkSession,
Expand Down Expand Up @@ -141,7 +141,7 @@ private[sql] class DefaultSource
// Should we merge schemas from all Parquet part-files?
val shouldMergeSchemas =
parameters
.get(ParquetRelation.MERGE_SCHEMA)
.get(ParquetFileFormat.MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))

Expand Down Expand Up @@ -217,7 +217,7 @@ private[sql] class DefaultSource
.orElse(filesByType.data.headOption)
.toSeq
}
ParquetRelation.mergeSchemasInParallel(filesToTouch, sparkSession)
ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
}

case class FileTypes(
Expand Down Expand Up @@ -543,7 +543,7 @@ private[sql] class ParquetOutputWriter(
override def close(): Unit = recordWriter.close(context)
}

private[sql] object ParquetRelation extends Logging {
private[sql] object ParquetFileFormat extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
private[sql] val MERGE_SCHEMA = "mergeSchema"

Expand Down Expand Up @@ -822,9 +822,9 @@ private[sql] object ParquetRelation extends Logging {
if (footers.isEmpty) {
Iterator.empty
} else {
var mergedSchema = ParquetRelation.readSchemaFromFooter(footers.head, converter)
var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter)
footers.tail.foreach { footer =>
val schema = ParquetRelation.readSchemaFromFooter(footer, converter)
val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter)
try {
mergedSchema = mergedSchema.merge(schema)
} catch { case cause: SparkException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration
/**
* A data source for reading text files.
*/
class DefaultSource extends FileFormat with DataSourceRegister {
class TextFileFormat extends FileFormat with DataSourceRegister {

override def shortName(): String = "text"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1320,15 +1320,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
className = classOf[DefaultSource].getCanonicalName,
className = classOf[JsonFileFormat].getCanonicalName,
options = Map("path" -> path)).resolveRelation()

val d2 = DataSource(
spark,
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
bucketSpec = None,
className = classOf[DefaultSource].getCanonicalName,
className = classOf[JsonFileFormat].getCanonicalName,
options = Map("path" -> path)).resolveRelation()
assert(d1 === d2)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {

ParquetRelation.mergeMetastoreParquetSchema(
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),
Expand All @@ -390,7 +390,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {

ParquetRelation.mergeMetastoreParquetSchema(
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),

Expand All @@ -401,7 +401,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {

// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
ParquetRelation.mergeMetastoreParquetSchema(
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),
Expand All @@ -412,7 +412,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {

// Conflicting non-nullable field names
intercept[Throwable] {
ParquetRelation.mergeMetastoreParquetSchema(
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
Expand All @@ -426,7 +426,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
ParquetRelation.mergeMetastoreParquetSchema(
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
Expand All @@ -439,7 +439,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
ParquetRelation.mergeMetastoreParquetSchema(
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,37 @@ class ResolvedDataSourceSuite extends SparkFunSuite {
test("jdbc") {
assert(
getProvidingClass("jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
assert(
getProvidingClass("org.apache.spark.sql.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
}

test("json") {
assert(
getProvidingClass("json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
classOf[org.apache.spark.sql.execution.datasources.json.JsonFileFormat])
}

test("parquet") {
assert(
getProvidingClass("parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
assert(
getProvidingClass("org.apache.spark.sql.parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
}

test("error message for unknown data sources") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
path.delete()

val hadoopConf = spark.sparkContext.hadoopConfiguration
val fileFormat = new parquet.DefaultSource()
val fileFormat = new parquet.ParquetFileFormat()

def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
val df = spark
Expand Down Expand Up @@ -73,7 +73,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
path.delete()

val hadoopConf = spark.sparkContext.hadoopConfiguration
val fileFormat = new parquet.DefaultSource()
val fileFormat = new parquet.ParquetFileFormat()

def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
val df = spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
}

class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
import testImplicits._

private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
org.apache.spark.sql.hive.orc.DefaultSource
org.apache.spark.sql.hive.orc.OrcFileFormat
Loading