Skip to content

Commit 111025f

Browse files
committed
address comments.
1 parent c6ef287 commit 111025f

File tree

3 files changed

+99
-86
lines changed

3 files changed

+99
-86
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
218218
bucketSpec = getBucketSpec,
219219
options = extraOptions.toMap)
220220

221-
dataSource.write(mode, df, isForWriteOnly = true)
221+
dataSource.write(mode, df)
222222
}
223223

224224
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,7 @@ case class CreateDataSourceTableAsSelectCommand(
199199
catalogTable = if (tableExists) Some(table) else None)
200200

201201
try {
202-
dataSource.write(mode, Dataset.ofRows(session, query)).getOrElse {
203-
throw new AnalysisException(s"Expected a BaseRelation but found None")
204-
}
202+
dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
205203
} catch {
206204
case ex: AnalysisException =>
207205
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 97 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -414,98 +414,113 @@ case class DataSource(
414414
}
415415

416416
/**
417-
* Writes the given [[DataFrame]] out to this [[DataSource]].
418-
*
419-
* @param isForWriteOnly Whether to just write the data without returning a [[BaseRelation]].
417+
* Writes the given [[DataFrame]] out in this [[FileFormat]].
420418
*/
421-
def write(
422-
mode: SaveMode,
423-
data: DataFrame,
424-
isForWriteOnly: Boolean = false): Option[BaseRelation] = {
419+
private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
420+
// Don't glob path for the write path. The contracts here are:
421+
// 1. Only one output path can be specified on the write path;
422+
// 2. Output path must be a legal HDFS style file system path;
423+
// 3. It's OK that the output path doesn't exist yet;
424+
val allPaths = paths ++ caseInsensitiveOptions.get("path")
425+
val outputPath = if (allPaths.length == 1) {
426+
val path = new Path(allPaths.head)
427+
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
428+
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
429+
} else {
430+
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
431+
s"got: ${allPaths.mkString(", ")}")
432+
}
433+
434+
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
435+
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
436+
437+
// If we are appending to a table that already exists, make sure the partitioning matches
438+
// up. If we fail to load the table for whatever reason, ignore the check.
439+
if (mode == SaveMode.Append) {
440+
val existingPartitionColumns = Try {
441+
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
442+
}.getOrElse(Seq.empty[String])
443+
// TODO: Case sensitivity.
444+
val sameColumns =
445+
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
446+
if (existingPartitionColumns.nonEmpty && !sameColumns) {
447+
throw new AnalysisException(
448+
s"""Requested partitioning does not match existing partitioning.
449+
|Existing partitioning columns:
450+
| ${existingPartitionColumns.mkString(", ")}
451+
|Requested partitioning columns:
452+
| ${partitionColumns.mkString(", ")}
453+
|""".stripMargin)
454+
}
455+
}
456+
457+
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
458+
// not need to have the query as child, to avoid to analyze an optimized query,
459+
// because InsertIntoHadoopFsRelationCommand will be optimized first.
460+
val partitionAttributes = partitionColumns.map { name =>
461+
val plan = data.logicalPlan
462+
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
463+
throw new AnalysisException(
464+
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
465+
}.asInstanceOf[Attribute]
466+
}
467+
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
468+
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
469+
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
470+
}.head
471+
}
472+
// For partitioned relation r, r.schema's column ordering can be different from the column
473+
// ordering of data.logicalPlan (partition columns are all moved after data column). This
474+
// will be adjusted within InsertIntoHadoopFsRelation.
475+
val plan =
476+
InsertIntoHadoopFsRelationCommand(
477+
outputPath = outputPath,
478+
staticPartitions = Map.empty,
479+
partitionColumns = partitionAttributes,
480+
bucketSpec = bucketSpec,
481+
fileFormat = format,
482+
options = options,
483+
query = data.logicalPlan,
484+
mode = mode,
485+
catalogTable = catalogTable,
486+
fileIndex = fileIndex)
487+
sparkSession.sessionState.executePlan(plan).toRdd
488+
}
489+
490+
/**
491+
* Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for
492+
* the following reading.
493+
*/
494+
def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
425495
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
426496
throw new AnalysisException("Cannot save interval data type into external storage.")
427497
}
428498

429499
providingClass.newInstance() match {
430500
case dataSource: CreatableRelationProvider =>
431-
Some(dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data))
501+
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
432502
case format: FileFormat =>
433-
// Don't glob path for the write path. The contracts here are:
434-
// 1. Only one output path can be specified on the write path;
435-
// 2. Output path must be a legal HDFS style file system path;
436-
// 3. It's OK that the output path doesn't exist yet;
437-
val allPaths = paths ++ caseInsensitiveOptions.get("path")
438-
val outputPath = if (allPaths.length == 1) {
439-
val path = new Path(allPaths.head)
440-
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
441-
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
442-
} else {
443-
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
444-
s"got: ${allPaths.mkString(", ")}")
445-
}
446-
447-
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
448-
PartitioningUtils.validatePartitionColumn(
449-
data.schema, partitionColumns, caseSensitive)
450-
451-
// If we are appending to a table that already exists, make sure the partitioning matches
452-
// up. If we fail to load the table for whatever reason, ignore the check.
453-
if (mode == SaveMode.Append) {
454-
val existingPartitionColumns = Try {
455-
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
456-
}.getOrElse(Seq.empty[String])
457-
// TODO: Case sensitivity.
458-
val sameColumns =
459-
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
460-
if (existingPartitionColumns.nonEmpty && !sameColumns) {
461-
throw new AnalysisException(
462-
s"""Requested partitioning does not match existing partitioning.
463-
|Existing partitioning columns:
464-
| ${existingPartitionColumns.mkString(", ")}
465-
|Requested partitioning columns:
466-
| ${partitionColumns.mkString(", ")}
467-
|""".stripMargin)
468-
}
469-
}
503+
writeInFileFormat(format, mode, data)
504+
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
505+
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
506+
case _ =>
507+
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
508+
}
509+
}
470510

471-
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
472-
// not need to have the query as child, to avoid to analyze an optimized query,
473-
// because InsertIntoHadoopFsRelationCommand will be optimized first.
474-
val partitionAttributes = partitionColumns.map { name =>
475-
val plan = data.logicalPlan
476-
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
477-
throw new AnalysisException(
478-
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
479-
}.asInstanceOf[Attribute]
480-
}
481-
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
482-
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
483-
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
484-
}.head
485-
}
486-
// For partitioned relation r, r.schema's column ordering can be different from the column
487-
// ordering of data.logicalPlan (partition columns are all moved after data column). This
488-
// will be adjusted within InsertIntoHadoopFsRelation.
489-
val plan =
490-
InsertIntoHadoopFsRelationCommand(
491-
outputPath = outputPath,
492-
staticPartitions = Map.empty,
493-
partitionColumns = partitionAttributes,
494-
bucketSpec = bucketSpec,
495-
fileFormat = format,
496-
options = options,
497-
query = data.logicalPlan,
498-
mode = mode,
499-
catalogTable = catalogTable,
500-
fileIndex = fileIndex)
501-
sparkSession.sessionState.executePlan(plan).toRdd
502-
if (isForWriteOnly) {
503-
None
504-
} else {
505-
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
506-
Some(copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation())
507-
}
511+
/**
512+
* Writes the given [[DataFrame]] out to this [[DataSource]].
513+
*/
514+
def write(mode: SaveMode, data: DataFrame): Unit = {
515+
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
516+
throw new AnalysisException("Cannot save interval data type into external storage.")
517+
}
508518

519+
providingClass.newInstance() match {
520+
case dataSource: CreatableRelationProvider =>
521+
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
522+
case format: FileFormat =>
523+
writeInFileFormat(format, mode, data)
509524
case _ =>
510525
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
511526
}

0 commit comments

Comments
 (0)