Skip to content

Commit 9240b77

Browse files
author
Sahil Takiar
committed
[SPARK-24243][CORE] Expose exceptions from InProcessAppHandle
Adds a new method to `SparkAppHandle` called `getError` which returns the exception (if present) that caused the underlying Spark app to fail. New tests added to `SparkLauncherSuite` for the new method.
1 parent 135ff16 commit 9240b77

File tree

6 files changed

+162
-13
lines changed

6 files changed

+162
-13
lines changed

core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
public class SparkLauncherSuite extends BaseSuite {
4242

4343
private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
44+
private static final String EXCEPTION_MESSAGE = "dummy-exception";
45+
private static final RuntimeException DUMMY_EXCEPTION = new RuntimeException(EXCEPTION_MESSAGE);
4446

4547
private final SparkLauncher launcher = new SparkLauncher();
4648

@@ -130,17 +132,8 @@ public void testInProcessLauncher() throws Exception {
130132
try {
131133
inProcessLauncherTestImpl();
132134
} finally {
133-
Properties p = new Properties();
134-
for (Map.Entry<Object, Object> e : properties.entrySet()) {
135-
p.put(e.getKey(), e.getValue());
136-
}
137-
System.setProperties(p);
138-
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
139-
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
140-
// See SPARK-23019 and SparkContext.stop() for details.
141-
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
142-
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
143-
});
135+
restoreSystemProperties(properties);
136+
waitForSparkContextShutdown();
144137
}
145138
}
146139

@@ -227,6 +220,82 @@ public void testInProcessLauncherDoesNotKillJvm() throws Exception {
227220
assertEquals(SparkAppHandle.State.LOST, handle.getState());
228221
}
229222

223+
@Test
224+
public void testInProcessLauncherGetError() throws Exception {
225+
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
226+
// properties, and that can cause test failures down the test pipeline. So restore the original
227+
// system properties after this test runs.
228+
Map<Object, Object> properties = new HashMap<>(System.getProperties());
229+
230+
SparkAppHandle handle = null;
231+
try {
232+
handle = new InProcessLauncher()
233+
.setMaster("local")
234+
.setAppResource(SparkLauncher.NO_RESOURCE)
235+
.setMainClass(ErrorInProcessTestApp.class.getName())
236+
.addAppArgs("hello")
237+
.startApplication();
238+
239+
final SparkAppHandle _handle = handle;
240+
eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
241+
assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
242+
});
243+
244+
assertNotNull(handle.getError());
245+
assertTrue(handle.getError().isPresent());
246+
assertSame(handle.getError().get(), DUMMY_EXCEPTION);
247+
} finally {
248+
if (handle != null) {
249+
handle.kill();
250+
}
251+
restoreSystemProperties(properties);
252+
waitForSparkContextShutdown();
253+
}
254+
}
255+
256+
@Test
257+
public void testSparkLauncherGetError() throws Exception {
258+
SparkAppHandle handle = null;
259+
try {
260+
handle = new SparkLauncher()
261+
.setMaster("local")
262+
.setAppResource(SparkLauncher.NO_RESOURCE)
263+
.setMainClass(ErrorInProcessTestApp.class.getName())
264+
.addAppArgs("hello")
265+
.startApplication();
266+
267+
final SparkAppHandle _handle = handle;
268+
eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
269+
assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
270+
});
271+
272+
assertNotNull(handle.getError());
273+
assertTrue(handle.getError().isPresent());
274+
assertTrue(handle.getError().get().getMessage().contains(EXCEPTION_MESSAGE));
275+
} finally {
276+
if (handle != null) {
277+
handle.kill();
278+
}
279+
}
280+
}
281+
282+
private void restoreSystemProperties(Map<Object, Object> properties) {
283+
Properties p = new Properties();
284+
for (Map.Entry<Object, Object> e : properties.entrySet()) {
285+
p.put(e.getKey(), e.getValue());
286+
}
287+
System.setProperties(p);
288+
}
289+
290+
private void waitForSparkContextShutdown() throws Exception {
291+
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
292+
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
293+
// See SPARK-23019 and SparkContext.stop() for details.
294+
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
295+
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
296+
});
297+
}
298+
230299
public static class SparkLauncherTestApp {
231300

232301
public static void main(String[] args) throws Exception {
@@ -264,4 +333,15 @@ public static void main(String[] args) throws Exception {
264333

265334
}
266335

336+
/**
337+
* Similar to {@link InProcessTestApp} except it throws an exception
338+
*/
339+
public static class ErrorInProcessTestApp {
340+
341+
public static void main(String[] args) {
342+
assertNotEquals(0, args.length);
343+
assertEquals(args[0], "hello");
344+
throw DUMMY_EXCEPTION;
345+
}
346+
}
267347
}

launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.launcher;
1919

2020
import java.io.InputStream;
21+
import java.util.Optional;
2122
import java.util.logging.Level;
2223
import java.util.logging.Logger;
2324

@@ -29,7 +30,7 @@ class ChildProcAppHandle extends AbstractAppHandle {
2930
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
3031

3132
private volatile Process childProc;
32-
private OutputRedirector redirector;
33+
private volatile OutputRedirector redirector;
3334

3435
ChildProcAppHandle(LauncherServer server) {
3536
super(server);
@@ -46,6 +47,25 @@ public synchronized void disconnect() {
4647
}
4748
}
4849

50+
/**
51+
* Parses the logs of {@code spark-submit} and returns the last exception thrown.
52+
*
53+
* <p>
54+
* Since {@link SparkLauncher} runs {@code spark-submit} in a sub-process, its difficult to
55+
* accurately retrieve the full {@link Throwable} from the {@code spark-submit} process.
56+
* This method parses the logs of the sub-process and provides a best-effort attempt at
57+
* returning the last exception thrown by the {@code spark-submit} process. Only the exception
58+
* message is parsed, the associated stacktrace is meaningless.
59+
* </p>
60+
*
61+
* @return an {@link Optional} containing a {@link RuntimeException} with the parsed
62+
* exception, otherwise returns a {@link Optional#EMPTY}
63+
*/
64+
@Override
65+
public Optional<Throwable> getError() {
66+
return redirector != null ? Optional.ofNullable(redirector.getError()) : Optional.empty();
67+
}
68+
4969
@Override
5070
public synchronized void kill() {
5171
if (!isDisposed()) {

launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.launcher;
1919

20+
import java.lang.reflect.InvocationTargetException;
2021
import java.lang.reflect.Method;
22+
import java.util.Optional;
2123
import java.util.concurrent.atomic.AtomicLong;
2224
import java.util.logging.Level;
2325
import java.util.logging.Logger;
@@ -31,6 +33,8 @@ class InProcessAppHandle extends AbstractAppHandle {
3133
// Avoid really long thread names.
3234
private static final int MAX_APP_NAME_LEN = 16;
3335

36+
private volatile Throwable error;
37+
3438
private Thread app;
3539

3640
InProcessAppHandle(LauncherServer server) {
@@ -51,6 +55,11 @@ public synchronized void kill() {
5155
}
5256
}
5357

58+
@Override
59+
public Optional<Throwable> getError() {
60+
return Optional.ofNullable(error);
61+
}
62+
5463
synchronized void start(String appName, Method main, String[] args) {
5564
CommandBuilderUtils.checkState(app == null, "Handle already started.");
5665

@@ -62,7 +71,11 @@ synchronized void start(String appName, Method main, String[] args) {
6271
try {
6372
main.invoke(null, (Object) args);
6473
} catch (Throwable t) {
74+
if (t instanceof InvocationTargetException) {
75+
t = t.getCause();
76+
}
6577
LOG.log(Level.WARNING, "Application failed with exception.", t);
78+
error = t;
6679
setState(State.FAILED);
6780
}
6881

launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class OutputRedirector {
3737
private final ChildProcAppHandle callback;
3838

3939
private volatile boolean active;
40+
private volatile Throwable error;
4041

4142
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
4243
this(in, loggerName, tf, null);
@@ -61,6 +62,10 @@ private void redirect() {
6162
while ((line = reader.readLine()) != null) {
6263
if (active) {
6364
sink.info(line.replaceFirst("\\s*$", ""));
65+
if (error == null && containsIgnoreCase(line, "Error") || containsIgnoreCase(line,
66+
"Exception")) {
67+
error = new RuntimeException(line);
68+
}
6469
}
6570
}
6671
} catch (IOException e) {
@@ -85,4 +90,24 @@ boolean isAlive() {
8590
return thread.isAlive();
8691
}
8792

93+
Throwable getError() {
94+
return error;
95+
}
96+
97+
/**
98+
* Copied from Apache Commons Lang {@code StringUtils#containsIgnoreCase(String, String)}
99+
*/
100+
private static boolean containsIgnoreCase(String str, String searchStr) {
101+
if (str == null || searchStr == null) {
102+
return false;
103+
}
104+
int len = searchStr.length();
105+
int max = str.length() - len;
106+
for (int i = 0; i <= max; i++) {
107+
if (str.regionMatches(true, i, searchStr, 0, len)) {
108+
return true;
109+
}
110+
}
111+
return false;
112+
}
88113
}

launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.launcher;
1919

20+
import java.util.Optional;
21+
2022
/**
2123
* A handle to a running Spark application.
2224
* <p>
@@ -100,6 +102,12 @@ public boolean isFinal() {
100102
*/
101103
void disconnect();
102104

105+
/**
106+
* If the application failed due to an error, return the underlying error. If the app
107+
* succeeded, this method returns an empty {@link Optional}.
108+
*/
109+
Optional<Throwable> getError();
110+
103111
/**
104112
* Listener for updates to a handle's state. The callbacks do not receive information about
105113
* what exactly has changed, just that an update has occurred.

project/MimaExcludes.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,10 @@ object MimaExcludes {
106106
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"),
107107

108108
// [SPARK-23042] Use OneHotEncoderModel to encode labels in MultilayerPerceptronClassifier
109-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter")
109+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter"),
110+
111+
// [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle
112+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.launcher.SparkAppHandle.getError")
110113
)
111114

112115
// Exclude rules for 2.3.x

0 commit comments

Comments
 (0)