Skip to content

Commit 2b6694d

Browse files
committed
DataFrameWriter.save doesn’t have to be a table.
1 parent a0e81fc commit 2b6694d

File tree

3 files changed

+37
-33
lines changed

3 files changed

+37
-33
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1452,7 +1452,7 @@ object SQLConf {
14521452
" register class names for which data source V2 write paths are disabled. Writes from these" +
14531453
" sources will fall back to the V1 sources.")
14541454
.stringConf
1455-
.createWithDefault("orc")
1455+
.createWithDefault("")
14561456

14571457
val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
14581458
.doc("A comma-separated list of fully qualified data source register class names for which" +

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable,
3030
import org.apache.spark.sql.execution.SQLExecution
3131
import org.apache.spark.sql.execution.command.DDLUtils
3232
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
33-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
33+
import org.apache.spark.sql.execution.datasources.v2._
3434
import org.apache.spark.sql.sources.BaseRelation
3535
import org.apache.spark.sql.sources.v2._
3636
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
@@ -265,38 +265,23 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
265265
val dsOptions = new DataSourceOptions(options.asJava)
266266
provider.getTable(dsOptions) match {
267267
case table: SupportsBatchWrite =>
268-
lazy val relation = DataSourceV2Relation.create(table, options)
269-
mode match {
270-
case SaveMode.Append =>
271-
runCommand(df.sparkSession, "save") {
272-
AppendData.byName(relation, df.logicalPlan)
273-
}
274-
275-
case SaveMode.Overwrite =>
276-
// truncate the table
277-
runCommand(df.sparkSession, "save") {
278-
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true))
268+
table.newWriteBuilder(dsOptions) match {
269+
case writeBuilder: SupportsSaveMode =>
270+
val write = writeBuilder.mode(mode)
271+
.withQueryId(UUID.randomUUID().toString)
272+
.withInputDataSchema(df.logicalPlan.schema)
273+
.buildForBatch()
274+
// It can only return null with `SupportsSaveMode`. We can clean it up after
275+
// removing `SupportsSaveMode`.
276+
if (write != null) {
277+
runCommand(df.sparkSession, "save") {
278+
WriteToDataSourceV2(write, df.logicalPlan)
279+
}
279280
}
280281

281282
case _ =>
282-
table.newWriteBuilder(dsOptions) match {
283-
case writeBuilder: SupportsSaveMode =>
284-
val write = writeBuilder.mode(mode)
285-
.withQueryId(UUID.randomUUID().toString)
286-
.withInputDataSchema(df.logicalPlan.schema)
287-
.buildForBatch()
288-
// It can only return null with `SupportsSaveMode`. We can clean it up after
289-
// removing `SupportsSaveMode`.
290-
if (write != null) {
291-
runCommand(df.sparkSession, "save") {
292-
WriteToDataSourceV2(write, df.logicalPlan)
293-
}
294-
}
295-
296-
case _ =>
297-
throw new AnalysisException(
298-
s"data source ${table.name} does not support SaveMode $mode")
299-
}
283+
throw new AnalysisException(
284+
s"data source ${table.name} does not support SaveMode $mode")
300285
}
301286

302287
// Streaming also uses the data source V2 API. So it may be that the data source implements

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
329329
test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") {
330330
withTempDir { dir =>
331331
val tempDir = new File(dir, "files").getCanonicalPath
332-
Seq(true).foreach { useV1 =>
332+
Seq(true, false).foreach { useV1 =>
333333
val useV1List = if (useV1) {
334334
"orc"
335335
} else {
@@ -374,7 +374,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
374374
}
375375

376376
test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") {
377-
Seq(true).foreach { useV1 =>
377+
Seq(true, false).foreach { useV1 =>
378378
val useV1List = if (useV1) {
379379
"orc"
380380
} else {
@@ -469,6 +469,25 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
469469
}
470470
}
471471

472+
test("File data sources V2 supports overwriting with different schema") {
473+
withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "") {
474+
Seq("orc", "parquet", "json").foreach { format =>
475+
withTempPath { p =>
476+
val path = p.getCanonicalPath
477+
spark.range(10).write.format(format).save(path)
478+
val newDF = spark.range(20).map(id => (id.toDouble, id.toString)).toDF("double", "string")
479+
newDF.write.format(format).mode("overwrite").save(path)
480+
481+
val readDF = spark.read.format(format).load(path)
482+
val expectedSchema = StructType(Seq(
483+
StructField("double", DoubleType, true), StructField("string", StringType, true)))
484+
assert(readDF.schema == expectedSchema)
485+
checkAnswer(readDF, newDF)
486+
}
487+
}
488+
}
489+
}
490+
472491
test("SPARK-25237 compute correct input metrics in FileScanRDD") {
473492
withTempPath { p =>
474493
val path = p.getAbsolutePath

0 commit comments

Comments
 (0)