Skip to content

Commit dfdf1bb

Browse files
Fangshi Licloud-fan
Fangshi Li
authored andcommitted
[SPARK-23815][CORE] Spark writer dynamic partition overwrite mode may fail to write output on multi level partition
## What changes were proposed in this pull request? Spark introduced new writer mode to overwrite only related partitions in SPARK-20236. While we are using this feature in our production cluster, we found a bug when writing multi-level partitions on HDFS. A simple test case to reproduce this issue: val df = Seq(("1","2","3")).toDF("col1", "col2","col3") df.write.partitionBy("col1","col2").mode("overwrite").save("/my/hdfs/location") If HDFS location "/my/hdfs/location" does not exist, there will be no output. This seems to be caused by the job commit change in SPARK-20236 in HadoopMapReduceCommitProtocol. In the commit job process, the output has been written into staging dir /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2, and then the code calls fs.rename to rename /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2 to /my/hdfs/location/col1=1/col2=2. However, in our case the operation will fail on HDFS because /my/hdfs/location/col1=1 does not exists. HDFS rename can not create directory for more than one level. This does not happen in the new unit test added with SPARK-20236 which uses local file system. We are proposing a fix. When cleaning current partition dir /my/hdfs/location/col1=1/col2=2 before the rename op, if the delete op fails (because /my/hdfs/location/col1=1/col2=2 may not exist), we call mkdirs op to create the parent dir /my/hdfs/location/col1=1 (if the parent dir does not exist) so the following rename op can succeed. Reference: in official HDFS document(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html), the rename command has precondition "dest must be root, or have a parent that exists" ## How was this patch tested? We have tested this patch on our production cluster and it fixed the problem Author: Fangshi Li <fli@linkedin.com> Closes apache#20931 from fangshil/master. (cherry picked from commit 4b07036) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 2995b79 commit dfdf1bb

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,17 @@ class HadoopMapReduceCommitProtocol(
186186
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
187187
for (part <- partitionPaths) {
188188
val finalPartPath = new Path(path, part)
189-
fs.delete(finalPartPath, true)
189+
if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {
190+
// According to the official hadoop FileSystem API spec, delete op should assume
191+
// the destination is no longer present regardless of return value, thus we do not
192+
// need to double check if finalPartPath exists before rename.
193+
// Also in our case, based on the spec, delete returns false only when finalPartPath
194+
// does not exist. When this happens, we need to take action if parent of finalPartPath
195+
// also does not exist(e.g. the scenario described on SPARK-23815), because
196+
// FileSystem API spec on rename op says the rename dest(finalPartPath) must have
197+
// a parent that exists, otherwise we may get unexpected result on the rename.
198+
fs.mkdirs(finalPartPath.getParent)
199+
}
190200
fs.rename(new Path(stagingDir, part), finalPartPath)
191201
}
192202
}

0 commit comments

Comments
 (0)