Skip to content

Commit b5856dd

Browse files
Yuzhou Sunviirya
authored andcommitted
[SPARK-35106][CORE][SQL] Avoid failing rename caused by destination directory not exist
### What changes were proposed in this pull request? 1. In HadoopMapReduceCommitProtocol, create parent directory before renaming custom partition path staging files 2. In InMemoryCatalog and HiveExternalCatalog, create new partition directory before renaming old partition path 3. Check return value of FileSystem#rename, if false, throw exception to avoid silent data loss cause by rename failure 4. Change DebugFilesystem#rename behavior to make it match HDFS's behavior (return false without rename when dst parent directory not exist) ### Why are the changes needed? Depends on FileSystem#rename implementation, when destination directory does not exist, file system may 1. return false without renaming file nor throwing exception (e.g. HDFS), or 2. create destination directory, rename files, and return true (e.g. LocalFileSystem) In the first case above, renames in HadoopMapReduceCommitProtocol for custom partition path will fail silently if the destination partition path does not exist. Failed renames can happen when 1. dynamicPartitionOverwrite == true, the custom partition path directories are deleted by the job before the rename; or 2. the custom partition path directories do not exist before the job; or 3. something else is wrong when file system handle `rename` The renames in MemoryCatalog and HiveExternalCatalog for partition renaming also have similar issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modified DebugFilesystem#rename, and added new unit tests. Without the fix in src code, five InsertSuite tests and one AlterTableRenamePartitionSuite test failed: InsertSuite.SPARK-20236: dynamic partition overwrite with custom partition path (existing test with modified FS) ``` == Results == !== Correct Answer - 1 == == Spark Answer - 0 == struct<> struct<> ![2,1,1] ``` InsertSuite.SPARK-35106: insert overwrite with custom partition path ``` == Results == !== Correct Answer - 1 == == Spark Answer - 0 == struct<> struct<> ![2,1,1] ``` InsertSuite.SPARK-35106: dynamic partition overwrite with custom partition path ``` == Results == !== Correct Answer - 2 == == Spark Answer - 1 == !struct<> struct<i:int,part1:int,part2:int> [1,1,1] [1,1,1] ![1,1,2] ``` InsertSuite.SPARK-35106: Throw exception when rename custom partition paths returns false ``` Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown ``` InsertSuite.SPARK-35106: Throw exception when rename dynamic partition paths returns false ``` Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown ``` AlterTableRenamePartitionSuite.ALTER TABLE .. RENAME PARTITION V1: multi part partition (existing test with modified FS) ``` == Results == !== Correct Answer - 1 == == Spark Answer - 0 == struct<> struct<> ![3,123,3] ``` Closes apache#32530 from YuzhouSun/SPARK-35106. Authored-by: Yuzhou Sun <yuzhosun@amazon.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a72d05c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 4385eae commit b5856dd

File tree

5 files changed

+151
-10
lines changed

5 files changed

+151
-10
lines changed

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,18 @@ class HadoopMapReduceCommitProtocol(
188188

189189
val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
190190
logDebug(s"Committing files staged for absolute locations $filesToMove")
191+
val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet
191192
if (dynamicPartitionOverwrite) {
192-
val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
193-
logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
194-
absPartitionPaths.foreach(fs.delete(_, true))
193+
logDebug(s"Clean up absolute partition directories for overwriting: $absParentPaths")
194+
absParentPaths.foreach(fs.delete(_, true))
195195
}
196+
logDebug(s"Create absolute parent directories: $absParentPaths")
197+
absParentPaths.foreach(fs.mkdirs)
196198
for ((src, dst) <- filesToMove) {
197-
fs.rename(new Path(src), new Path(dst))
199+
if (!fs.rename(new Path(src), new Path(dst))) {
200+
throw new IOException(s"Failed to rename $src to $dst when committing files staged for " +
201+
s"absolute locations")
202+
}
198203
}
199204

200205
if (dynamicPartitionOverwrite) {
@@ -213,7 +218,11 @@ class HadoopMapReduceCommitProtocol(
213218
// a parent that exists, otherwise we may get unexpected result on the rename.
214219
fs.mkdirs(finalPartPath.getParent)
215220
}
216-
fs.rename(new Path(stagingDir, part), finalPartPath)
221+
val stagingPartPath = new Path(stagingDir, part)
222+
if (!fs.rename(stagingPartPath, finalPartPath)) {
223+
throw new IOException(s"Failed to rename $stagingPartPath to $finalPartPath when " +
224+
s"committing files staged for overwriting dynamic partitions")
225+
}
217226
}
218227
}
219228

core/src/test/scala/org/apache/spark/DebugFilesystem.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,14 @@ object DebugFilesystem extends Logging {
5757
}
5858

5959
/**
60-
* DebugFilesystem wraps file open calls to track all open connections. This can be used in tests
61-
* to check that connections are not leaked.
60+
* DebugFilesystem wraps
61+
* 1) file open calls to track all open connections. This can be used in tests to check that
62+
* connections are not leaked;
63+
* 2) rename calls to return false when destination's parent path does not exist. When
64+
* destination parent does not exist, LocalFileSystem uses FileUtil#copy to copy the
65+
* file and returns true if succeed, while many other hadoop file systems (e.g. HDFS, S3A)
66+
* return false without renaming any file. This helps to test that Spark can work with the
67+
* latter file systems.
6268
*/
6369
// TODO(ekl) we should consider always interposing this to expose num open conns as a metric
6470
class DebugFilesystem extends LocalFileSystem {
@@ -120,4 +126,8 @@ class DebugFilesystem extends LocalFileSystem {
120126
override def hashCode(): Int = wrapped.hashCode()
121127
}
122128
}
129+
130+
override def rename(src: Path, dst: Path): Boolean = {
131+
exists(dst.getParent) && super.rename(src, dst)
132+
}
123133
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,11 @@ class InMemoryCatalog(
499499
newSpec, partitionColumnNames, tablePath)
500500
try {
501501
val fs = tablePath.getFileSystem(hadoopConfig)
502-
fs.rename(oldPartPath, newPartPath)
502+
fs.mkdirs(newPartPath)
503+
if(!fs.rename(oldPartPath, newPartPath)) {
504+
throw new IOException(s"Renaming partition path from $oldPartPath to " +
505+
s"$newPartPath returned false")
506+
}
503507
} catch {
504508
case e: IOException =>
505509
throw new SparkException(s"Unable to rename partition path $oldPartPath", e)

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.sources
1919

20-
import java.io.File
20+
import java.io.{File, IOException}
2121
import java.sql.Date
2222

2323
import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path, RawLocalFileSystem}
@@ -950,6 +950,110 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
950950
checkAnswer(spark.table("t2"), Nil)
951951
}
952952
}
953+
954+
test("SPARK-35106: insert overwrite with custom partition path") {
955+
withTempPath { path =>
956+
withTable("t") {
957+
sql(
958+
"""
959+
|create table t(i int, part1 int, part2 int) using parquet
960+
|partitioned by (part1, part2)
961+
""".stripMargin)
962+
963+
sql(s"alter table t add partition(part1=1, part2=1) location '${path.getAbsolutePath}'")
964+
sql(s"insert into t partition(part1=1, part2=1) select 1")
965+
checkAnswer(spark.table("t"), Row(1, 1, 1))
966+
967+
sql("insert overwrite table t partition(part1=1, part2=1) select 2")
968+
checkAnswer(spark.table("t"), Row(2, 1, 1))
969+
970+
sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
971+
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
972+
973+
sql("insert overwrite table t partition(part1=1, part2=2) select 3")
974+
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
975+
976+
sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
977+
checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Nil)
978+
}
979+
}
980+
}
981+
982+
test("SPARK-35106: dynamic partition overwrite with custom partition path") {
983+
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
984+
withTempPath { path =>
985+
withTable("t") {
986+
sql(
987+
"""
988+
|create table t(i int, part1 int, part2 int) using parquet
989+
|partitioned by (part1, part2)
990+
""".stripMargin)
991+
992+
sql(s"insert into t partition(part1=1, part2=1) select 1")
993+
checkAnswer(spark.table("t"), Row(1, 1, 1))
994+
995+
sql(s"alter table t add partition(part1=1, part2=2) location '${path.getAbsolutePath}'")
996+
997+
// dynamic partition overwrite to empty custom partition
998+
sql(s"insert overwrite table t partition(part1=1, part2=2) select 1")
999+
checkAnswer(spark.table("t"), Row(1, 1, 1) :: Row(1, 1, 2) :: Nil)
1000+
1001+
// dynamic partition overwrite to non-empty custom partition
1002+
sql("insert overwrite table t partition(part1=1, part2=2) select 2")
1003+
checkAnswer(spark.table("t"), Row(1, 1, 1) :: Row(2, 1, 2) :: Nil)
1004+
}
1005+
}
1006+
}
1007+
}
1008+
1009+
test("SPARK-35106: Throw exception when rename custom partition paths returns false") {
1010+
withSQLConf(
1011+
"fs.file.impl" -> classOf[RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem].getName,
1012+
"fs.file.impl.disable.cache" -> "true") {
1013+
withTempPath { path =>
1014+
withTable("t") {
1015+
sql(
1016+
"""
1017+
|create table t(i int, part1 int, part2 int) using parquet
1018+
|partitioned by (part1, part2)
1019+
""".stripMargin)
1020+
1021+
sql(s"alter table t add partition(part1=1, part2=1) location '${path.getAbsolutePath}'")
1022+
1023+
val e = intercept[SparkException] {
1024+
sql(s"insert into t partition(part1=1, part2=1) select 1")
1025+
}.getCause
1026+
assert(e.isInstanceOf[IOException])
1027+
assert(e.getMessage.contains("Failed to rename"))
1028+
assert(e.getMessage.contains("when committing files staged for absolute location"))
1029+
}
1030+
}
1031+
}
1032+
}
1033+
1034+
test("SPARK-35106: Throw exception when rename dynamic partition paths returns false") {
1035+
withSQLConf(
1036+
"fs.file.impl" -> classOf[RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem].getName,
1037+
"fs.file.impl.disable.cache" -> "true",
1038+
SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
1039+
1040+
withTable("t") {
1041+
sql(
1042+
"""
1043+
|create table t(i int, part1 int, part2 int) using parquet
1044+
|partitioned by (part1, part2)
1045+
""".stripMargin)
1046+
1047+
val e = intercept[SparkException] {
1048+
sql(s"insert overwrite table t partition(part1, part2) values (1, 1, 1)")
1049+
}.getCause
1050+
assert(e.isInstanceOf[IOException])
1051+
assert(e.getMessage.contains("Failed to rename"))
1052+
assert(e.getMessage.contains(
1053+
"when committing files staged for overwriting dynamic partitions"))
1054+
}
1055+
}
1056+
}
9531057
}
9541058

9551059
class FileExistingTestFileSystem extends RawLocalFileSystem {
@@ -962,3 +1066,13 @@ class FileExistingTestFileSystem extends RawLocalFileSystem {
9621066
throw new FileAlreadyExistsException(s"${f.toString} already exists")
9631067
}
9641068
}
1069+
1070+
class RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLocalFileSystem {
1071+
override def rename(src: Path, dst: Path): Boolean = {
1072+
(!isSparkStagingDir(src) || isSparkStagingDir(dst)) && super.rename(src, dst)
1073+
}
1074+
1075+
private def isSparkStagingDir(path: Path): Boolean = {
1076+
path.toString.contains(".spark-staging-")
1077+
}
1078+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1081,7 +1081,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10811081
// scalastyle:on caselocale
10821082
val actualPartitionPath = new Path(currentFullPath, actualPartitionString)
10831083
try {
1084-
fs.rename(actualPartitionPath, expectedPartitionPath)
1084+
fs.mkdirs(expectedPartitionPath)
1085+
if(!fs.rename(actualPartitionPath, expectedPartitionPath)) {
1086+
throw new IOException(s"Renaming partition path from $actualPartitionPath to " +
1087+
s"$expectedPartitionPath returned false")
1088+
}
10851089
} catch {
10861090
case e: IOException =>
10871091
throw new SparkException("Unable to rename partition path from " +

0 commit comments

Comments
 (0)