Skip to content

[SPARK-23020][core] Fix race in SparkAppHandle cleanup, again. #20388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -29,15 +30,15 @@ abstract class AbstractAppHandle implements SparkAppHandle {

private final LauncherServer server;

private LauncherConnection connection;
private LauncherServer.ServerConnection connection;
private List<Listener> listeners;
private State state;
private AtomicReference<State> state;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale behind this? Previously we just make sure all access and modification to state is synchronized, are you changing it to AtomicReference for performance reasons?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new code, synchronization would cause a deadlock.

  • handle calls closeAndWait() inside synchronized block which joins connection thread
  • connection thread would call setState() on the handle and cause a deadlock

Changing the state should be as thread-safe as before with the new code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

private String appId;
private volatile boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
this.server = server;
this.state = State.UNKNOWN;
this.state = new AtomicReference<>(State.UNKNOWN);
}

@Override
Expand All @@ -50,7 +51,7 @@ public synchronized void addListener(Listener l) {

@Override
public State getState() {
return state;
return state.get();
}

@Override
Expand All @@ -73,7 +74,7 @@ public synchronized void disconnect() {
if (!isDisposed()) {
if (connection != null) {
try {
connection.close();
connection.closeAndWait();
} catch (IOException ioe) {
// no-op.
}
Expand All @@ -82,7 +83,7 @@ public synchronized void disconnect() {
}
}

void setConnection(LauncherConnection connection) {
void setConnection(LauncherServer.ServerConnection connection) {
this.connection = connection;
}

Expand All @@ -99,12 +100,9 @@ boolean isDisposed() {
*/
synchronized void dispose() {
if (!isDisposed()) {
// Unregister first to make sure that the connection with the app has been really
// terminated.
server.unregister(this);
if (!getState().isFinal()) {
setState(State.LOST);
}
// Set state to LOST if not yet final.
setState(State.LOST, false);
this.disposed = true;
}
}
Expand All @@ -113,14 +111,24 @@ void setState(State s) {
setState(s, false);
}

synchronized void setState(State s, boolean force) {
if (force || !state.isFinal()) {
state = s;
void setState(State s, boolean force) {
if (force) {
state.set(s);
fireEvent(false);
} else {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { state, s });
return;
}

State current = state.get();
while (!current.isFinal()) {
if (state.compareAndSet(current, s)) {
fireEvent(false);
return;
}
current = state.get();
}

LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { current, s });
}

synchronized void setAppId(String appId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,12 @@ void monitorChild() {
ec = 1;
}

State currState = getState();
State newState = null;
if (ec != 0) {
State currState = getState();
// Override state with failure if the current state is not final, or is success.
if (!currState.isFinal() || currState == State.FINISHED) {
newState = State.FAILED;
setState(State.FAILED, true);
}
} else if (!currState.isFinal()) {
newState = State.LOST;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not needed anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done by dispose() already.

}

if (newState != null) {
setState(newState, true);
}

disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,7 @@ synchronized void start(String appName, Method main, String[] args) {
setState(State.FAILED);
}

synchronized (InProcessAppHandle.this) {
if (!isDisposed()) {
disconnect();
if (!getState().isFinal()) {
setState(State.LOST, true);
}
}
}
disconnect();
});

app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,32 +218,6 @@ void unregister(AbstractAppHandle handle) {
}
}

// If there is a live connection for this handle, we need to wait for it to finish before
// returning, otherwise there might be a race between the connection thread processing
// buffered data and the handle cleaning up after itself, leading to potentially the wrong
// state being reported for the handle.
ServerConnection conn = null;
synchronized (clients) {
for (ServerConnection c : clients) {
if (c.handle == handle) {
conn = c;
break;
}
}
}

if (conn != null) {
synchronized (conn) {
if (conn.isOpen()) {
try {
conn.wait();
} catch (InterruptedException ie) {
// Ignore.
}
}
}
}

unref();
}

Expand Down Expand Up @@ -312,16 +286,23 @@ private String createSecret() {
}
}

private class ServerConnection extends LauncherConnection {
class ServerConnection extends LauncherConnection {

private TimerTask timeout;
private volatile Thread connectionThread;
volatile AbstractAppHandle handle;

ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
this.timeout = timeout;
}

@Override
public void run() {
this.connectionThread = Thread.currentThread();
super.run();
}

@Override
protected void handle(Message msg) throws IOException {
try {
Expand Down Expand Up @@ -376,9 +357,23 @@ public void close() throws IOException {
clients.remove(this);
}

synchronized (this) {
super.close();
notifyAll();
super.close();
}

/**
* Close the connection and wait for any buffered data to be processed before returning.
* This ensures any changes reported by the child application take effect.
*/
public void closeAndWait() throws IOException {
close();

Thread connThread = this.connectionThread;
if (Thread.currentThread() != connThread) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a safeguard or it can really happen? i.e. the connection thread calls closeAndWait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a safeguard. I don't think it would happen in this version of the code, but better be safe.

try {
connThread.join();
} catch (InterruptedException ie) {
// Ignore.
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ import org.apache.spark.util.Utils

/**
* Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN
* applications, and require the Spark assembly to be built before they can be successfully
* run.
* applications.
*/
@ExtendedYarnTest
class YarnClusterSuite extends BaseYarnClusterSuite {
Expand Down Expand Up @@ -152,7 +151,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
}

test("run Python application in yarn-cluster mode using " +
" spark.yarn.appMasterEnv to override local envvar") {
"spark.yarn.appMasterEnv to override local envvar") {
testPySpark(
clientMode = false,
extraConf = Map(
Expand Down