Skip to content

Commit 2386a15

Browse files
yaooqinncloud-fan
authored andcommitted
[SPARK-31321][SQL] Remove SaveMode check in v2 FileWriteBuilder
### What changes were proposed in this pull request? The `SaveMode` is resolved before we create `FileWriteBuilder` to build `BatchWrite`. In #25876, we removed save mode for DSV2 from DataFrameWriter. So that the `mode` method is never used which makes `validateInputs` fail determinately without `mode` set. ### Why are the changes needed? rm dead code. ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests. Closes #28090 from yaooqinn/SPARK-31321. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 1ce584f) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent a852dbc commit 2386a15

File tree

1 file changed

+3
-29
lines changed

1 file changed

+3
-29
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2
1818

19-
import java.io.IOException
2019
import java.util.UUID
2120

2221
import scala.collection.JavaConverters._
@@ -27,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
2726
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
2827

2928
import org.apache.spark.internal.io.FileCommitProtocol
30-
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
29+
import org.apache.spark.sql.{AnalysisException, SparkSession}
3130
import org.apache.spark.sql.catalyst.InternalRow
3231
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3332
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
@@ -46,12 +45,6 @@ abstract class FileWriteBuilder(
4645
private val schema = info.schema()
4746
private val queryId = info.queryId()
4847
private val options = info.options()
49-
private var mode: SaveMode = _
50-
51-
def mode(mode: SaveMode): WriteBuilder = {
52-
this.mode = mode
53-
this
54-
}
5548

5649
override def buildForBatch(): BatchWrite = {
5750
val sparkSession = SparkSession.active
@@ -68,26 +61,8 @@ abstract class FileWriteBuilder(
6861
lazy val description =
6962
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap)
7063

71-
val fs = path.getFileSystem(hadoopConf)
72-
mode match {
73-
case SaveMode.ErrorIfExists if fs.exists(path) =>
74-
val qualifiedOutputPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
75-
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
76-
77-
case SaveMode.Ignore if fs.exists(path) =>
78-
null
79-
80-
case SaveMode.Overwrite =>
81-
if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
82-
throw new IOException(s"Unable to clear directory $path prior to writing to it")
83-
}
84-
committer.setupJob(job)
85-
new FileBatchWrite(job, description, committer)
86-
87-
case _ =>
88-
committer.setupJob(job)
89-
new FileBatchWrite(job, description, committer)
90-
}
64+
committer.setupJob(job)
65+
new FileBatchWrite(job, description, committer)
9166
}
9267

9368
/**
@@ -104,7 +79,6 @@ abstract class FileWriteBuilder(
10479
private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = {
10580
assert(schema != null, "Missing input data schema")
10681
assert(queryId != null, "Missing query ID")
107-
assert(mode != null, "Missing save mode")
10882

10983
if (paths.length != 1) {
11084
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +

0 commit comments

Comments
 (0)