Skip to content

Commit 7b8b1fa

Browse files
committed
Address review feedback
1 parent 781caba commit 7b8b1fa

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

core/src/main/java/org/apache/spark/SparkFirehoseListener.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,17 @@
3030
*/
3131
public class SparkFirehoseListener implements SparkListenerInterface {
3232

33+
protected volatile boolean dead;
34+
3335
@Override
3436
public boolean dead() {
35-
return false;
37+
return dead;
3638
}
3739

3840
@Override
39-
public void dead_$eq(boolean dead) { }
41+
public void dead_$eq(boolean dead) {
42+
this.dead = dead;
43+
}
4044

4145
public void onEvent(SparkListenerEvent event) { }
4246

core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,9 @@ private class AsyncEventQueue(
210210

211211
override def removeListenerOnError(listener: SparkListenerInterface): Unit = {
212212
if (bus.isInStop) {
213-
// If bus is in the progress of stop, we just mark the listener as dead instead of removing
214-
// via calling `bus.removeListener` to avoid race condition
215-
// dead listeners will be removed eventually in `bus.stop`
213+
// If we're in the middle of stopping the bus, we just mark the listener as dead,
214+
// instead of removing, to avoid a deadlock.
215+
// Dead listeners will be removed eventually in `bus.stop`.
216216
listener.dead = true
217217
} else {
218218
// the listener failed in an unrecoverably way, we want to remove it from the entire

0 commit comments

Comments
 (0)