Skip to content

Commit 55ee864

Browse files
committed
address comments.
1 parent 31b8724 commit 55ee864

File tree

9 files changed

+29
-32
lines changed

9 files changed

+29
-32
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
144144
DataSource.apply(
145145
sparkSession,
146146
paths = paths,
147-
inputSchema = userSpecifiedSchema,
148-
isSchemaFromUsers = true,
147+
userSpecifiedSchema = userSpecifiedSchema,
149148
className = source,
150149
options = extraOptions.toMap).resolveRelation())
151150
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
6464
val dataSource: BaseRelation =
6565
DataSource(
6666
sparkSession = sparkSession,
67-
inputSchema = if (table.schema.isEmpty) None else Some(table.schema),
68-
isSchemaFromUsers = true,
67+
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
6968
className = table.provider.get,
7069
bucketSpec = table.bucketSpec,
7170
options = table.storage.properties).resolveRelation()
@@ -165,7 +164,7 @@ case class CreateDataSourceTableAsSelectCommand(
165164
// Check if the specified data source match the data source of the existing table.
166165
val dataSource = DataSource(
167166
sparkSession = sparkSession,
168-
inputSchema = Some(query.schema.asNullable),
167+
userSpecifiedSchema = Some(query.schema.asNullable),
169168
partitionColumns = table.partitionColumnNames,
170169
bucketSpec = table.bucketSpec,
171170
className = provider,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ import org.apache.spark.util.Utils
6060
*
6161
* @param paths A list of file system paths that hold data. These will be globbed before and
6262
* qualified. This option only works when reading from a [[FileFormat]].
63-
* @param inputSchema An optional specification of the schema of the data. When present we skip
64-
* attempting to infer the schema.
65-
* @param isSchemaFromUsers A flag to indicate whether the schema is specified by users.
63+
* @param userSpecifiedSchema An optional specification of the schema of the data. When present
64+
* we skip attempting to infer the schema.
6665
* @param partitionColumns A list of column names that the relation is partitioned by. When this
6766
* list is empty, the relation is unpartitioned.
6867
* @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
@@ -71,8 +70,7 @@ case class DataSource(
7170
sparkSession: SparkSession,
7271
className: String,
7372
paths: Seq[String] = Nil,
74-
inputSchema: Option[StructType] = None,
75-
isSchemaFromUsers: Boolean = false,
73+
userSpecifiedSchema: Option[StructType] = None,
7674
partitionColumns: Seq[String] = Seq.empty,
7775
bucketSpec: Option[BucketSpec] = None,
7876
options: Map[String, String] = Map.empty) extends Logging {
@@ -189,7 +187,7 @@ case class DataSource(
189187
}
190188

191189
private def inferFileFormatSchema(format: FileFormat): StructType = {
192-
inputSchema.orElse {
190+
userSpecifiedSchema.orElse {
193191
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
194192
val allPaths = caseInsensitiveOptions.get("path")
195193
val globbedPaths = allPaths.toSeq.flatMap { path =>
@@ -213,7 +211,7 @@ case class DataSource(
213211
providingClass.newInstance() match {
214212
case s: StreamSourceProvider =>
215213
val (name, schema) = s.sourceSchema(
216-
sparkSession.sqlContext, inputSchema, className, options)
214+
sparkSession.sqlContext, userSpecifiedSchema, className, options)
217215
SourceInfo(name, schema)
218216

219217
case format: FileFormat =>
@@ -236,7 +234,7 @@ case class DataSource(
236234
val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference
237235
val isTextSource = providingClass == classOf[text.TextFileFormat]
238236
// If the schema inference is disabled, only text sources require schema to be specified
239-
if (!isSchemaInferenceEnabled && !isTextSource && inputSchema.isEmpty) {
237+
if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
240238
throw new IllegalArgumentException(
241239
"Schema must be specified when creating a streaming source DataFrame. " +
242240
"If some files already exist in the directory, then depending on the file format " +
@@ -255,7 +253,8 @@ case class DataSource(
255253
def createSource(metadataPath: String): Source = {
256254
providingClass.newInstance() match {
257255
case s: StreamSourceProvider =>
258-
s.createSource(sparkSession.sqlContext, metadataPath, inputSchema, className, options)
256+
s.createSource(
257+
sparkSession.sqlContext, metadataPath, userSpecifiedSchema, className, options)
259258

260259
case format: FileFormat =>
261260
val path = new CaseInsensitiveMap(options).getOrElse("path", {
@@ -320,28 +319,29 @@ case class DataSource(
320319
*/
321320
def resolveRelation(): BaseRelation = {
322321
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
323-
val relation = (providingClass.newInstance(), inputSchema) match {
322+
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
324323
// TODO: Throw when too much is given.
325324
case (dataSource: SchemaRelationProvider, Some(schema)) =>
326325
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
327326
case (dataSource: RelationProvider, None) =>
328327
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
329328
case (_: SchemaRelationProvider, None) =>
330329
throw new AnalysisException(s"A schema needs to be specified when using $className.")
331-
case (dataSource: RelationProvider, Some(_)) =>
332-
if (isSchemaFromUsers) {
333-
throw new AnalysisException(s"$className does not allow user-specified schemas.")
334-
} else {
330+
case (dataSource: RelationProvider, Some(schema)) =>
331+
val baseRelation =
335332
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
333+
if (baseRelation.schema != schema) {
334+
throw new AnalysisException(s"$className does not allow user-specified schemas.")
336335
}
336+
baseRelation
337337

338338
// We are reading from the results of a streaming query. Load files from the metadata log
339339
// instead of listing them using HDFS APIs.
340340
case (format: FileFormat, _)
341341
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
342342
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
343343
val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
344-
val dataSchema = inputSchema.orElse {
344+
val dataSchema = userSpecifiedSchema.orElse {
345345
format.inferSchema(
346346
sparkSession,
347347
caseInsensitiveOptions,
@@ -381,7 +381,7 @@ case class DataSource(
381381

382382
// If they gave a schema, then we try and figure out the types of the partition columns
383383
// from that schema.
384-
val partitionSchema = inputSchema.map { schema =>
384+
val partitionSchema = userSpecifiedSchema.map { schema =>
385385
StructType(
386386
partitionColumns.map { c =>
387387
// TODO: Case sensitivity.
@@ -395,7 +395,7 @@ case class DataSource(
395395
new ListingFileCatalog(
396396
sparkSession, globbedPaths, options, partitionSchema)
397397

398-
val dataSchema = inputSchema.map { schema =>
398+
val dataSchema = userSpecifiedSchema.map { schema =>
399399
val equality = sparkSession.sessionState.conf.resolver
400400
StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
401401
}.orElse {
@@ -505,7 +505,7 @@ case class DataSource(
505505
mode)
506506
sparkSession.sessionState.executePlan(plan).toRdd
507507
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
508-
copy(inputSchema = Some(data.schema.asNullable)).resolveRelation()
508+
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
509509

510510
case _ =>
511511
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
201201
val dataSource =
202202
DataSource(
203203
sparkSession,
204-
inputSchema = Some(table.schema),
204+
userSpecifiedSchema = Some(table.schema),
205205
partitionColumns = table.partitionColumnNames,
206206
bucketSpec = table.bucketSpec,
207207
className = table.provider.get,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ case class CreateTempViewUsing(
5555
def run(sparkSession: SparkSession): Seq[Row] = {
5656
val dataSource = DataSource(
5757
sparkSession,
58-
inputSchema = userSpecifiedSchema,
59-
isSchemaFromUsers = true,
58+
userSpecifiedSchema = userSpecifiedSchema,
6059
className = provider,
6160
options = options)
6261
sparkSession.sessionState.catalog.createTempView(

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ class FileStreamSource(
133133
DataSource(
134134
sparkSession,
135135
paths = files.map(_.path),
136-
inputSchema = Some(schema),
136+
userSpecifiedSchema = Some(schema),
137137
className = fileFormatClassName,
138138
options = sourceOptions.optionMapWithoutPath)
139139
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
136136
val dataSource =
137137
DataSource(
138138
sparkSession,
139-
inputSchema = userSpecifiedSchema,
139+
userSpecifiedSchema = userSpecifiedSchema,
140140
className = source,
141141
options = extraOptions.toMap)
142142
Dataset.ofRows(sparkSession, StreamingRelation(dataSource))

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,15 +1346,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
13461346

13471347
val d1 = DataSource(
13481348
spark,
1349-
inputSchema = None,
1349+
userSpecifiedSchema = None,
13501350
partitionColumns = Array.empty[String],
13511351
bucketSpec = None,
13521352
className = classOf[JsonFileFormat].getCanonicalName,
13531353
options = Map("path" -> path)).resolveRelation()
13541354

13551355
val d2 = DataSource(
13561356
spark,
1357-
inputSchema = None,
1357+
userSpecifiedSchema = None,
13581358
partitionColumns = Array.empty[String],
13591359
bucketSpec = None,
13601360
className = classOf[JsonFileFormat].getCanonicalName,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
7474
val dataSource =
7575
DataSource(
7676
sparkSession,
77-
inputSchema = Some(table.schema),
77+
userSpecifiedSchema = Some(table.schema),
7878
partitionColumns = table.partitionColumnNames,
7979
bucketSpec = table.bucketSpec,
8080
className = table.provider.get,
@@ -278,7 +278,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
278278
DataSource(
279279
sparkSession = sparkSession,
280280
paths = paths,
281-
inputSchema = Some(metastoreRelation.schema),
281+
userSpecifiedSchema = Some(metastoreRelation.schema),
282282
bucketSpec = bucketSpec,
283283
options = options,
284284
className = fileType).resolveRelation(),

0 commit comments

Comments
 (0)