Skip to content

Commit 9378ba3

Browse files
committed
fail gracefully on corrupt leveldb files
1 parent acedb62 commit 9378ba3

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,19 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
100100
throw new IOException("Unable to create state store", dbExc);
101101
}
102102
} else {
103-
throw e;
103+
logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
104+
"recover state for existing applications", registeredExecutorFile, e);
105+
for (File f: registeredExecutorFile.listFiles()) {
106+
f.delete();
107+
}
108+
registeredExecutorFile.delete();
109+
options.createIfMissing(true);
110+
try {
111+
tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
112+
} catch (NativeDB.DBException dbExc) {
113+
throw new IOException("Unable to create state store", dbExc);
114+
}
115+
104116
}
105117
}
106118
try {

network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ protected void serviceInit(Configuration conf) {
127127
try {
128128
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
129129
} catch (Exception e) {
130-
logger.error("Failed to initial external shuffle service", e);
130+
logger.error("Failed to initialize external shuffle service", e);
131131
}
132132

133133
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();

yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
172172
ShuffleTestAccessor.reloadRegisteredExecutors(db) shouldBe empty
173173
}
174174

175-
ignore("shuffle service should be robust to corrupt registered executor file") {
175+
test("shuffle service should be robust to corrupt registered executor file") {
176176
s1 = new YarnShuffleService
177177
s1.init(yarnConfig)
178178
val app1Id = ApplicationId.newInstance(0, 1)

0 commit comments

Comments
 (0)