Skip to content

Commit 10cae04

Browse files
wangshuo128Marcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError
### What changes were proposed in this pull request? There is a deadlock between `LiveListenerBus#stop` and `AsyncEventQueue#removeListenerOnError`. We can reproduce as follows: 1. Post some events to `LiveListenerBus` 2. Call `LiveListenerBus#stop` and hold the synchronized lock of `bus`(https://github.com/apache/spark/blob/5e92301723464d0876b5a7eec59c15fed0c5b98c/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L229), waiting until all the events are processed by listeners, then remove all the queues 3. Event queue would drain out events by posting to its listeners. If a listener is interrupted, it will call `AsyncEventQueue#removeListenerOnError`, inside it will call `bus.removeListener`(https://github.com/apache/spark/blob/7b1b60c7583faca70aeab2659f06d4e491efa5c0/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala#L207), trying to acquire synchronized lock of bus, resulting in deadlock This PR removes the `synchronized` from `LiveListenerBus.stop` because underlying data structures themselves are thread-safe. ### Why are the changes needed? To fix deadlock. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? New UT. Closes #26924 from wangshuo128/event-queue-race-condition. Authored-by: Wang Shuo <wangshuo128@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 1b0570c commit 10cae04

File tree

2 files changed

+72
-4
lines changed

2 files changed

+72
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
226226
return
227227
}
228228

229-
synchronized {
230-
queues.asScala.foreach(_.stop())
231-
queues.clear()
232-
}
229+
queues.asScala.foreach(_.stop())
230+
queues.clear()
233231
}
234232

235233
// For testing only.

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,47 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
529529
}
530530
}
531531

532+
Seq(true, false).foreach { throwInterruptedException =>
533+
val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted"
534+
test(s"SPARK-30285: Fix deadlock in AsyncEventQueue.removeListenerOnError: $suffix") {
535+
val LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS = 10 * 1000L // 10 seconds
536+
val bus = new LiveListenerBus(new SparkConf(false))
537+
val counter1 = new BasicJobCounter()
538+
val counter2 = new BasicJobCounter()
539+
val interruptingListener = new DelayInterruptingJobCounter(throwInterruptedException, 3)
540+
bus.addToSharedQueue(counter1)
541+
bus.addToSharedQueue(interruptingListener)
542+
bus.addToEventLogQueue(counter2)
543+
assert(bus.activeQueues() === Set(SHARED_QUEUE, EVENT_LOG_QUEUE))
544+
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
545+
assert(bus.findListenersByClass[DelayInterruptingJobCounter]().size === 1)
546+
547+
bus.start(mockSparkContext, mockMetricsSystem)
548+
549+
(0 until 5).foreach { jobId =>
550+
bus.post(SparkListenerJobEnd(jobId, jobCompletionTime, JobSucceeded))
551+
}
552+
553+
// Call bus.stop in a separate thread, otherwise we will block here until bus is stopped
554+
val stoppingThread = new Thread(() => {
555+
bus.stop()
556+
})
557+
stoppingThread.start()
558+
// Notify interrupting listener starts to work
559+
interruptingListener.sleep = false
560+
// Wait for bus to stop
561+
stoppingThread.join(LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS)
562+
563+
// Stopping has been finished
564+
assert(stoppingThread.isAlive === false)
565+
// All queues are removed
566+
assert(bus.activeQueues() === Set.empty)
567+
assert(counter1.count === 5)
568+
assert(counter2.count === 5)
569+
assert(interruptingListener.count === 3)
570+
}
571+
}
572+
532573
test("event queue size can be configued through spark conf") {
533574
// configure the shared queue size to be 1, event log queue size to be 2,
534575
// and listner bus event queue size to be 5
@@ -627,6 +668,35 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
627668
}
628669
}
629670
}
671+
672+
/**
673+
* A simple listener that works as follows:
674+
* 1. sleep and wait when `sleep` is true
675+
* 2. when `sleep` is false, start to work:
676+
* if it is interruptOnJobId, interrupt
677+
* else count SparkListenerJobEnd numbers
678+
*/
679+
private class DelayInterruptingJobCounter(
680+
val throwInterruptedException: Boolean,
681+
val interruptOnJobId: Int) extends SparkListener {
682+
@volatile var sleep = true
683+
var count = 0
684+
685+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
686+
while (sleep) {
687+
Thread.sleep(10)
688+
}
689+
if (interruptOnJobId == jobEnd.jobId) {
690+
if (throwInterruptedException) {
691+
throw new InterruptedException("got interrupted")
692+
} else {
693+
Thread.currentThread().interrupt()
694+
}
695+
} else {
696+
count += 1
697+
}
698+
}
699+
}
630700
}
631701

632702
// These classes can't be declared inside of the SparkListenerSuite class because we don't want

0 commit comments

Comments
 (0)