Skip to content

Commit e9f99e8

Browse files
committed
cleanup the handling of bad dbs a little
1 parent 9378ba3 commit e9f99e8

File tree

1 file changed

+12
-14
lines changed

1 file changed

+12
-14
lines changed

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
8787
options.createIfMissing(false);
8888
options.logger(new LevelDBLogger());
8989
DB tmpDb;
90-
ConcurrentMap<AppExecId, ExecutorShuffleInfo> tmpExecutors;
9190
try {
9291
tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
9392
} catch (NativeDB.DBException e) {
@@ -100,6 +99,8 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
10099
throw new IOException("Unable to create state store", dbExc);
101100
}
102101
} else {
102+
// the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
103+
// one, so we can keep processing new apps
103104
logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
104105
"recover state for existing applications", registeredExecutorFile, e);
105106
for (File f: registeredExecutorFile.listFiles()) {
@@ -115,15 +116,8 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
115116

116117
}
117118
}
118-
try {
119-
tmpExecutors = reloadRegisteredExecutors(tmpDb);
120-
} catch (Exception e) {
121-
logger.info("Error opening leveldb file {}", registeredExecutorFile, e);
122-
tmpDb = null;
123-
tmpExecutors = Maps.newConcurrentMap();
124-
}
119+
executors = reloadRegisteredExecutors(tmpDb);
125120
db = tmpDb;
126-
executors = tmpExecutors;
127121
} else {
128122
db = null;
129123
executors = Maps.newConcurrentMap();
@@ -348,7 +342,7 @@ private static AppExecId parseDbAppExecKey(byte[] bytes) {
348342

349343
@VisibleForTesting
350344
static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db)
351-
throws IOException, ClassNotFoundException {
345+
throws IOException {
352346
ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
353347
if (db != null) {
354348
DBIterator itr = db.iterator();
@@ -357,10 +351,14 @@ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(D
357351
Map.Entry<byte[], byte[]> e = itr.next();
358352
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(e.getValue()));
359353
AppExecId id = parseDbAppExecKey(e.getKey());
360-
registeredExecutors.put(
361-
id,
362-
(ExecutorShuffleInfo) in.readObject()
363-
);
354+
try {
355+
registeredExecutors.put(
356+
id,
357+
(ExecutorShuffleInfo) in.readObject()
358+
);
359+
} catch (ClassNotFoundException e1) {
360+
throw new IOException(e1);
361+
}
364362
in.close();
365363
}
366364
}

0 commit comments

Comments
 (0)