Skip to content

[SPARK-23020][CORE] Fix races in launcher code, test. #20297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.launcher;

import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -25,13 +26,13 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.mockito.Mockito.*;

import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.internal.config.package$;
import org.apache.spark.util.Utils;

Expand Down Expand Up @@ -121,8 +122,7 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}

// TODO: [SPARK-23020] Re-enable this
@Ignore
@Test
public void testInProcessLauncher() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
Expand All @@ -139,7 +139,9 @@ public void testInProcessLauncher() throws Exception {
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
// See SPARK-23019 and SparkContext.stop() for details.
TimeUnit.MILLISECONDS.sleep(500);
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
});
}
}

Expand All @@ -148,26 +150,35 @@ private void inProcessLauncherTestImpl() throws Exception {
SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
doAnswer(invocation -> {
SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
transitions.add(h.getState());
synchronized (transitions) {
transitions.add(h.getState());
}
return null;
}).when(listener).stateChanged(any(SparkAppHandle.class));

SparkAppHandle handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
} finally {
if (handle != null) {
handle.kill();
}
}
}

public static class SparkLauncherTestApp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
private List<Listener> listeners;
private State state;
private String appId;
private boolean disposed;
private volatile boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
this.server = server;
Expand Down Expand Up @@ -70,16 +70,15 @@ public void stop() {

@Override
public synchronized void disconnect() {
if (!disposed) {
disposed = true;
if (!isDisposed()) {
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
dispose();
}
}

Expand All @@ -95,6 +94,21 @@ boolean isDisposed() {
return disposed;
}

/**
* Mark the handle as disposed, and set it as LOST in case the current state is not final.
*/
synchronized void dispose() {
if (!isDisposed()) {
// Unregister first to make sure that the connection with the app has been really
// terminated.
server.unregister(this);
if (!getState().isFinal()) {
setState(State.LOST);
}
this.disposed = true;
}
}

void setState(State s) {
setState(s, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ public synchronized void disconnect() {

@Override
public synchronized void kill() {
disconnect();
if (childProc != null) {
if (childProc.isAlive()) {
childProc.destroyForcibly();
if (!isDisposed()) {
setState(State.KILLED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should set the state to KILLED once the kill method is called? Even the code below fails(throw exception), the state should still be KILLED?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1,I have the same question in last review. We should figure it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the calls below should raise exceptions. Even the socket close is wrapped in a try..catch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even the order doesn't matter, I think it's more conventional to set the state at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the comment I made in the previous PR applies. Closing the socket / killing the child can have other implications (like changing the state) and it's easier to reason about what happens if the desired state change happens first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense.

disconnect();
if (childProc != null) {
if (childProc.isAlive()) {
childProc.destroyForcibly();
}
childProc = null;
}
childProc = null;
}
setState(State.KILLED);
}

void setChildProc(Process childProc, String loggerName, InputStream logStream) {
Expand Down Expand Up @@ -94,8 +96,6 @@ void monitorChild() {
return;
}

disconnect();

int ec;
try {
ec = proc.exitValue();
Expand All @@ -118,6 +118,8 @@ void monitorChild() {
if (newState != null) {
setState(newState, true);
}

disconnect();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ class InProcessAppHandle extends AbstractAppHandle {

@Override
public synchronized void kill() {
LOG.warning("kill() may leave the underlying app running in in-process mode.");
disconnect();

// Interrupt the thread. This is not guaranteed to kill the app, though.
if (app != null) {
app.interrupt();
if (!isDisposed()) {
LOG.warning("kill() may leave the underlying app running in in-process mode.");
setState(State.KILLED);
disconnect();

// Interrupt the thread. This is not guaranteed to kill the app, though.
if (app != null) {
app.interrupt();
}
}

setState(State.KILLED);
}

synchronized void start(String appName, Method main, String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ abstract class LauncherConnection implements Closeable, Runnable {
public void run() {
try {
FilteredObjectInputStream in = new FilteredObjectInputStream(socket.getInputStream());
while (!closed) {
while (isOpen()) {
Message msg = (Message) in.readObject();
handle(msg);
}
Expand Down Expand Up @@ -95,15 +95,15 @@ protected synchronized void send(Message msg) throws IOException {
}

@Override
public void close() throws IOException {
if (!closed) {
synchronized (this) {
if (!closed) {
closed = true;
socket.close();
}
}
public synchronized void close() throws IOException {
if (isOpen()) {
closed = true;
socket.close();
}
}

boolean isOpen() {
return !closed;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,33 @@ void unregister(AbstractAppHandle handle) {
break;
}
}

// If there is a live connection for this handle, we need to wait for it to finish before
// returning, otherwise there might be a race between the connection thread processing
// buffered data and the handle cleaning up after itself, leading to potentially the wrong
// state being reported for the handle.
ServerConnection conn = null;
synchronized (clients) {
for (ServerConnection c : clients) {
if (c.handle == handle) {
conn = c;
break;
}
}
}

if (conn != null) {
synchronized (conn) {
if (conn.isOpen()) {
try {
conn.wait();
} catch (InterruptedException ie) {
// Ignore.
}
}
}
}

unref();
}

Expand Down Expand Up @@ -288,7 +315,7 @@ private String createSecret() {
private class ServerConnection extends LauncherConnection {

private TimerTask timeout;
private AbstractAppHandle handle;
volatile AbstractAppHandle handle;

ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
Expand All @@ -313,7 +340,7 @@ protected void handle(Message msg) throws IOException {
} else {
if (handle == null) {
throw new IllegalArgumentException("Expected hello, got: " +
msg != null ? msg.getClass().getName() : null);
msg != null ? msg.getClass().getName() : null);
}
if (msg instanceof SetAppId) {
SetAppId set = (SetAppId) msg;
Expand All @@ -331,23 +358,27 @@ protected void handle(Message msg) throws IOException {
timeout.cancel();
}
close();
if (handle != null) {
Copy link
Contributor Author

@vanzin vanzin Jan 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for the new race described in the summary (the code is moved here from below).

This changes behavior slightly: the handle now waits for the child process / thread to finish before disposing itself, whereas before that would happen as soon as the connection with the child process / thread was closed.

handle.dispose();
}
} finally {
timeoutTimer.purge();
}
}

@Override
public void close() throws IOException {
if (!isOpen()) {
return;
}

synchronized (clients) {
clients.remove(this);
}
super.close();
if (handle != null) {
if (!handle.getState().isFinal()) {
LOG.log(Level.WARNING, "Lost connection to spark application.");
handle.setState(SparkAppHandle.State.LOST);
}
handle.disconnect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we don't disconnect now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


synchronized (this) {
super.close();
notifyAll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why notifyAll? who might be waiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See L239.

}
}

Expand Down
42 changes: 35 additions & 7 deletions launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.launcher;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.junit.After;
Expand Down Expand Up @@ -47,19 +48,46 @@ public void postChecks() {
assertNull(server);
}

protected void waitFor(SparkAppHandle handle) throws Exception {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
protected void waitFor(final SparkAppHandle handle) throws Exception {
try {
while (!handle.getState().isFinal()) {
assertTrue("Timed out waiting for handle to transition to final state.",
System.nanoTime() < deadline);
TimeUnit.MILLISECONDS.sleep(10);
}
eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
assertTrue("Handle is not in final state.", handle.getState().isFinal());
});
} finally {
if (!handle.getState().isFinal()) {
handle.kill();
}
}

// Wait until the handle has been marked as disposed, to make sure all cleanup tasks
// have been performed.
AbstractAppHandle ahandle = (AbstractAppHandle) handle;
eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
assertTrue("Handle is still not marked as disposed.", ahandle.isDisposed());
});
}

/**
* Call a closure that performs a check every "period" until it succeeds, or the timeout
* elapses.
*/
protected void eventually(Duration timeout, Duration period, Runnable check) throws Exception {
assertTrue("Timeout needs to be larger than period.", timeout.compareTo(period) > 0);
long deadline = System.nanoTime() + timeout.toNanos();
int count = 0;
while (true) {
try {
count++;
check.run();
return;
} catch (Throwable t) {
if (System.nanoTime() >= deadline) {
String msg = String.format("Failed check after %d tries: %s.", count, t.getMessage());
throw new IllegalStateException(msg, t);
}
Thread.sleep(period.toMillis());
}
}
}

}
Loading