-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23020][core] Fix race in SparkAppHandle cleanup, again. #20388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,19 +104,12 @@ void monitorChild() { | |
ec = 1; | ||
} | ||
|
||
State currState = getState(); | ||
State newState = null; | ||
if (ec != 0) { | ||
State currState = getState(); | ||
// Override state with failure if the current state is not final, or is success. | ||
if (!currState.isFinal() || currState == State.FINISHED) { | ||
newState = State.FAILED; | ||
setState(State.FAILED, true); | ||
} | ||
} else if (!currState.isFinal()) { | ||
newState = State.LOST; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this not needed anymore? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is done by |
||
} | ||
|
||
if (newState != null) { | ||
setState(newState, true); | ||
} | ||
|
||
disconnect(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -218,32 +218,6 @@ void unregister(AbstractAppHandle handle) { | |
} | ||
} | ||
|
||
// If there is a live connection for this handle, we need to wait for it to finish before | ||
// returning, otherwise there might be a race between the connection thread processing | ||
// buffered data and the handle cleaning up after itself, leading to potentially the wrong | ||
// state being reported for the handle. | ||
ServerConnection conn = null; | ||
synchronized (clients) { | ||
for (ServerConnection c : clients) { | ||
if (c.handle == handle) { | ||
conn = c; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
if (conn != null) { | ||
synchronized (conn) { | ||
if (conn.isOpen()) { | ||
try { | ||
conn.wait(); | ||
} catch (InterruptedException ie) { | ||
// Ignore. | ||
} | ||
} | ||
} | ||
} | ||
|
||
unref(); | ||
} | ||
|
||
|
@@ -312,16 +286,23 @@ private String createSecret() { | |
} | ||
} | ||
|
||
private class ServerConnection extends LauncherConnection { | ||
class ServerConnection extends LauncherConnection { | ||
|
||
private TimerTask timeout; | ||
private volatile Thread connectionThread; | ||
volatile AbstractAppHandle handle; | ||
|
||
ServerConnection(Socket socket, TimerTask timeout) throws IOException { | ||
super(socket); | ||
this.timeout = timeout; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
this.connectionThread = Thread.currentThread(); | ||
super.run(); | ||
} | ||
|
||
@Override | ||
protected void handle(Message msg) throws IOException { | ||
try { | ||
|
@@ -376,9 +357,23 @@ public void close() throws IOException { | |
clients.remove(this); | ||
} | ||
|
||
synchronized (this) { | ||
super.close(); | ||
notifyAll(); | ||
super.close(); | ||
} | ||
|
||
/** | ||
* Close the connection and wait for any buffered data to be processed before returning. | ||
* This ensures any changes reported by the child application take effect. | ||
*/ | ||
public void closeAndWait() throws IOException { | ||
close(); | ||
|
||
Thread connThread = this.connectionThread; | ||
if (Thread.currentThread() != connThread) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a safeguard or it can really happen? i.e. the connection thread calls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More of a safeguard. I don't think it would happen in this version of the code, but better be safe. |
||
try { | ||
connThread.join(); | ||
} catch (InterruptedException ie) { | ||
// Ignore. | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the rationale behind this? Previously we just make sure all access and modification to state is synchronized, are you changing it to
AtomicReference
for performance reasons?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new code, synchronization would cause a deadlock.
closeAndWait()
inside synchronized block which joins connection threadsetState()
on the handle and cause a deadlockChanging the state should be as thread-safe as before with the new code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense