Skip to content

Commit 03eb318

Browse files
pan3793LuciferYang
authored andcommitted
[SPARK-52615][CORE] Replace File.mkdirs with Utils.createDirectory
### What changes were proposed in this pull request? We hit an issue that `File.mkdirs()` may occasionally fail with no error during the submission phase(we didn't configure `spark.yarn.archive` in that cluster so each submission requires packaging and uploading spark client jars, which cosumes a lot of disk IO), which was also mentioned in `Utils.createDirectory` > // SPARK-35907: The check was required by File.mkdirs() because it could sporadically > // fail silently. ... ``` 25/06/27 19:12:17 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 25/06/27 19:12:17 ERROR Utils: Failed to create dir in ./local. Ignoring this directory. 25/06/27 19:12:17 INFO Client: Deleted staging directory hdfs://<cluster>/user/<user>/.sparkStaging/application_1747844918192_28291290 Exception in thread "main" java.io.IOException: Failed to get a temp directory under [./local]. at org.apache.spark.util.Utils$.getLocalDir(Utils.scala:896) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:672) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:1005) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:231) at org.apache.spark.deploy.yarn.Client.run(Client.scala:1352) at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1800) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1019) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1107) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1116) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` So I replaced `File.mkdirs` with `Utils.createDirectory` and deployed it into our internal cluster, no similar failures happens after then ... (not sure why, maybe the replaced NIO method is more robust?) Additional context: [JDK-4227544](https://bugs.openjdk.org/browse/JDK-4227544) "design bug: File.mkdir(), etc. don't provide reason for failure" get closed with "Won't Fix" > It is too late to change this now. The new io framework will handle this situation better. ### Why are the changes needed? To achieve better error message reporting when creating a directory fails. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I made such a change in internal Spark, and deployed it to a busy YARN cluster, the submit process has been stable so far. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51322 from pan3793/SPARK-52615. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent 2bd93da commit 03eb318

File tree

40 files changed

+79
-65
lines changed

40 files changed

+79
-65
lines changed

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.FileOutputStream;
2323
import java.io.IOException;
2424
import java.io.OutputStream;
25+
import java.nio.file.Files;
2526

2627
import com.google.common.io.Closeables;
2728

@@ -54,7 +55,7 @@ public void create() throws IOException {
5455
localDirs[i] = JavaUtils.createDirectory(root, "spark").getAbsolutePath();
5556

5657
for (int p = 0; p < subDirsPerLocalDir; p ++) {
57-
new File(localDirs[i], String.format("%02x", p)).mkdirs();
58+
Files.createDirectories(new File(localDirs[i], String.format("%02x", p)).toPath());
5859
}
5960
}
6061
}

common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private[spark] object IvyTestUtils {
9595
className: String,
9696
packageName: String): Seq[(String, File)] = {
9797
val rFilesDir = new File(dir, "R" + File.separator + "pkg")
98-
new File(rFilesDir, "R").mkdirs()
98+
SparkFileUtils.createDirectory(new File(rFilesDir, "R"))
9999
val contents =
100100
s"""myfunc <- function(x) {
101101
| SparkR:::callJStatic("$packageName.$className", "myFunc", x)
@@ -150,11 +150,11 @@ private[spark] object IvyTestUtils {
150150
useIvyLayout: Boolean): File = {
151151
if (useIvyLayout) {
152152
val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true)
153-
ivyXmlPath.mkdirs()
153+
SparkFileUtils.createDirectory(ivyXmlPath)
154154
createIvyDescriptor(ivyXmlPath, artifact, dependencies)
155155
} else {
156156
val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout)
157-
pomPath.mkdirs()
157+
SparkFileUtils.createDirectory(pomPath)
158158
createPom(pomPath, artifact, dependencies)
159159
}
160160
}
@@ -293,13 +293,13 @@ private[spark] object IvyTestUtils {
293293
// Where the root of the repository exists, and what Ivy will search in
294294
val tempPath = tempDir.getOrElse(SparkFileUtils.createTempDir())
295295
// Create directory if it doesn't exist
296-
tempPath.mkdirs()
296+
SparkFileUtils.createDirectory(tempPath)
297297
// Where to create temporary class files and such
298298
val root = new File(tempPath, tempPath.hashCode().toString)
299-
root.mkdirs()
299+
SparkFileUtils.createDirectory(root)
300300
try {
301301
val jarPath = pathFromCoordinate(artifact, tempPath, "jar", useIvyLayout)
302-
jarPath.mkdirs()
302+
SparkFileUtils.createDirectory(jarPath)
303303
val className = "MyLib"
304304

305305
val javaClass = createJavaClass(root, className, artifact.groupId)

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1899,7 +1899,7 @@ abstract class AvroSuite
18991899
withTempPath { tempDir =>
19001900
val tempEmptyDir = s"$tempDir/sqlOverwrite"
19011901
// Create a temp directory for table that will be overwritten
1902-
new File(tempEmptyDir).mkdirs()
1902+
Utils.createDirectory(tempEmptyDir)
19031903
spark.sql(
19041904
s"""
19051905
|CREATE TEMPORARY VIEW episodes

connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
3838
import org.apache.spark._
3939
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
4040
import org.apache.spark.streaming.kafka010.mocks.MockTime
41+
import org.apache.spark.util.Utils
4142

4243
class KafkaRDDSuite extends SparkFunSuite {
4344

@@ -91,7 +92,7 @@ class KafkaRDDSuite extends SparkFunSuite {
9192
val logs = new Pool[TopicPartition, UnifiedLog]()
9293
val logDir = kafkaTestUtils.brokerLogDir
9394
val dir = new File(logDir, topic + "-" + partition)
94-
dir.mkdirs()
95+
Utils.createDirectory(dir)
9596
val logProps = new ju.Properties()
9697
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
9798
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, java.lang.Float.valueOf(0.1f))

core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private[deploy] object RPackageUtils extends Logging {
153153
if (verbose) {
154154
print(log"Creating directory: ${MDC(PATH, dir)}", printStream)
155155
}
156-
dir.mkdirs
156+
Utils.createDirectory(dir)
157157
} else {
158158
val inStream = jar.getInputStream(entry)
159159
val outPath = new File(tempDir, entryPath)

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private[deploy] class DriverRunner(
145145
*/
146146
private def createWorkingDirectory(): File = {
147147
val driverDir = new File(workDir, driverId)
148-
if (!driverDir.exists() && !driverDir.mkdirs()) {
148+
if (!driverDir.exists() && !Utils.createDirectory(driverDir)) {
149149
throw new IOException("Failed to create directory " + driverDir)
150150
}
151151
driverDir

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ private[deploy] class Worker(
600600

601601
// Create the executor's working directory
602602
val executorDir = new File(workDir, appId + "/" + execId)
603-
if (!executorDir.mkdirs()) {
603+
if (!Utils.createDirectory(executorDir)) {
604604
throw new IOException("Failed to create directory " + executorDir)
605605
}
606606

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private[spark] class PipedRDD[T: ClassTag](
9090
val currentDir = new File(".")
9191
logDebug("currentDir = " + currentDir.getAbsolutePath())
9292
val taskDirFile = new File(taskDirectory)
93-
taskDirFile.mkdirs()
93+
Utils.createDirectory(taskDirFile)
9494

9595
try {
9696
val tasksDirFilter = new NotEqualsFileNameFilter("tasks")

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ private[spark] class DiskBlockManager(
316316
throw SparkCoreErrors.failToCreateDirectoryError(dirToCreate.getAbsolutePath, maxAttempts)
317317
}
318318
try {
319-
dirToCreate.mkdirs()
319+
Utils.createDirectory(dirToCreate)
320320
Files.setPosixFilePermissions(
321321
dirToCreate.toPath, PosixFilePermissions.fromString("rwxrwx---"))
322322
if (dirToCreate.exists()) {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ private[spark] object Utils
458458
* to work around a security issue, see also SPARK-38631.
459459
*/
460460
private def unTarUsingJava(source: File, dest: File): Unit = {
461-
if (!dest.mkdirs && !dest.isDirectory) {
461+
if (!Utils.createDirectory(dest) && !dest.isDirectory) {
462462
throw new IOException(s"Mkdirs failed to create $dest")
463463
} else {
464464
try {
@@ -810,7 +810,7 @@ private[spark] object Utils
810810
configuredLocalDirs.flatMap { root =>
811811
try {
812812
val rootDir = new File(root)
813-
if (rootDir.exists || rootDir.mkdirs()) {
813+
if (rootDir.exists || Utils.createDirectory(rootDir)) {
814814
val dir = createTempDir(root)
815815
chmod700(dir)
816816
Some(dir.getAbsolutePath)

0 commit comments

Comments
 (0)