Skip to content

Commit 0f92be5

Browse files
holdenkJoshRosen
authored andcommitted
[SPARK-8498] [TUNGSTEN] fix npe in errorhandling path in unsafeshuffle writer
Author: Holden Karau <holden@pigscanfly.ca> Closes apache#6918 from holdenk/SPARK-8498-fix-npe-in-errorhandling-path-in-unsafeshuffle-writer and squashes the following commits: f807832 [Holden Karau] Log error if we can't throw it 855f9aa [Holden Karau] Spelling - not my strongest suite. Fix Propegates to Propagates. 039d620 [Holden Karau] Add missing closeandwriteoutput 30e558d [Holden Karau] go back to try/finally e503b8c [Holden Karau] Improve the test to ensure we aren't masking the underlying exception ae0b7a7 [Holden Karau] Fix the test 2e6abf7 [Holden Karau] Be more cautious when cleaning up during failed write and re-throw user exceptions
1 parent 6ceb169 commit 0f92be5

File tree

2 files changed

+33
-2
lines changed

2 files changed

+33
-2
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
139139

140140
@Override
141141
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
142+
// Keep track of success so we know if we ecountered an exception
143+
// We do this rather than a standard try/catch/re-throw to handle
144+
// generic throwables.
142145
boolean success = false;
143146
try {
144147
while (records.hasNext()) {
@@ -147,8 +150,19 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
147150
closeAndWriteOutput();
148151
success = true;
149152
} finally {
150-
if (!success) {
151-
sorter.cleanupAfterError();
153+
if (sorter != null) {
154+
try {
155+
sorter.cleanupAfterError();
156+
} catch (Exception e) {
157+
// Only throw this error if we won't be masking another
158+
// error.
159+
if (success) {
160+
throw e;
161+
} else {
162+
logger.error("In addition to a failure during writing, we failed during " +
163+
"cleanup.", e);
164+
}
165+
}
152166
}
153167
}
154168
}

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,23 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
253253
createWriter(false).stop(false);
254254
}
255255

256+
class PandaException extends RuntimeException {
257+
}
258+
259+
@Test(expected=PandaException.class)
260+
public void writeFailurePropagates() throws Exception {
261+
class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
262+
@Override public boolean hasNext() {
263+
throw new PandaException();
264+
}
265+
@Override public Product2<Object, Object> next() {
266+
return null;
267+
}
268+
}
269+
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
270+
writer.write(new BadRecords());
271+
}
272+
256273
@Test
257274
public void writeEmptyIterator() throws Exception {
258275
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);

0 commit comments

Comments
 (0)