Skip to content

Commit 2e6abf7

Browse files
committed
Be more cautious when cleaning up during failed write and re-throw user exceptions
1 parent 165f52f commit 2e6abf7

File tree

2 files changed

+24
-5
lines changed

2 files changed

+24
-5
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,22 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
139139

140140
@Override
141141
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
142-
boolean success = false;
143142
try {
144143
while (records.hasNext()) {
145144
insertRecordIntoSorter(records.next());
146145
}
147146
closeAndWriteOutput();
148-
success = true;
149-
} finally {
150-
if (!success) {
151-
sorter.cleanupAfterError();
147+
} catch (Exception e) {
148+
// Trigger a cleanup if we encountered an error
149+
if (sorter != null) {
150+
try {
151+
sorter.cleanupAfterError();
152+
} catch (Exception e2) {
153+
throw new RuntimeException("Failed to perform cleanup after exception", e);
154+
}
152155
}
156+
// re-throw the exception
157+
throw e;
153158
}
154159
}
155160

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,20 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
253253
createWriter(false).stop(false);
254254
}
255255

256+
@Test(expected=RuntimeException.class)
257+
public void writeFailurePropegates() throws Exception {
258+
class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
259+
@Override public boolean hasNext() {
260+
throw new RuntimeException("panda magic");
261+
}
262+
@Override public Product2<Object, Object> next() {
263+
return null;
264+
}
265+
}
266+
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
267+
writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
268+
}
269+
256270
@Test
257271
public void writeEmptyIterator() throws Exception {
258272
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);

0 commit comments

Comments
 (0)