Skip to content

Commit e93f984

Browse files
committed
[SPARK-19812] YARN shuffle service fails to relocate recovery DB across NFS directories
1 parent 0332063 commit e93f984

File tree

1 file changed

+13
-10
lines changed

1 file changed

+13
-10
lines changed

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.io.IOException;
2222
import java.nio.charset.StandardCharsets;
2323
import java.nio.ByteBuffer;
24-
import java.nio.file.Files;
2524
import java.util.List;
2625
import java.util.Map;
2726

@@ -340,17 +339,17 @@ protected Path getRecoveryPath(String fileName) {
340339
* when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise
341340
* it will uses a YARN local dir.
342341
*/
343-
protected File initRecoveryDb(String dbFileName) {
342+
protected File initRecoveryDb(String dbName) {
344343
if (_recoveryPath != null) {
345-
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName);
344+
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName);
346345
if (recoveryFile.exists()) {
347346
return recoveryFile;
348347
}
349348
}
350349
// db doesn't exist in recovery path go check local dirs for it
351350
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
352351
for (String dir : localDirs) {
353-
File f = new File(new Path(dir).toUri().getPath(), dbFileName);
352+
File f = new File(new Path(dir).toUri().getPath(), dbName);
354353
if (f.exists()) {
355354
if (_recoveryPath == null) {
356355
// If NM recovery is not enabled, we should specify the recovery path using NM local
@@ -363,25 +362,29 @@ protected File initRecoveryDb(String dbFileName) {
363362
// make sure to move all DBs to the recovery path from the old NM local dirs.
364363
// If another DB was initialized first just make sure all the DBs are in the same
365364
// location.
366-
File newLoc = new File(_recoveryPath.toUri().getPath(), dbFileName);
367-
if (!newLoc.equals(f)) {
365+
Path newLoc = new Path(_recoveryPath, dbName);
366+
Path copyFrom = new Path(f.toURI());
367+
if (!newLoc.equals(copyFrom)) {
368+
logger.info("Moving " + copyFrom + " to: " + newLoc);
368369
try {
369-
Files.move(f.toPath(), newLoc.toPath());
370+
// The move here needs to handle moving non-empty directories across NFS mounts
371+
FileSystem fs = FileSystem.getLocal(_conf);
372+
fs.rename(copyFrom, newLoc);
370373
} catch (Exception e) {
371374
// Fail to move recovery file to new path, just continue on with new DB location
372375
logger.error("Failed to move recovery file {} to the path {}",
373-
dbFileName, _recoveryPath.toString(), e);
376+
dbName, _recoveryPath.toString(), e);
374377
}
375378
}
376-
return newLoc;
379+
return new File(newLoc.toUri().getPath());
377380
}
378381
}
379382
}
380383
if (_recoveryPath == null) {
381384
_recoveryPath = new Path(localDirs[0]);
382385
}
383386

384-
return new File(_recoveryPath.toUri().getPath(), dbFileName);
387+
return new File(_recoveryPath.toUri().getPath(), dbName);
385388
}
386389

387390
/**

0 commit comments

Comments
 (0)