Skip to content

[SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError #26924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
return
}

synchronized {
queues.asScala.foreach(_.stop())
queues.clear()
}
queues.asScala.foreach(_.stop())
queues.clear()
}

// For testing only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,47 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
}

Seq(true, false).foreach { throwInterruptedException =>
val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted"
test(s"SPARK-30285: Fix deadlock in AsyncEventQueue.removeListenerOnError: $suffix") {
val LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS = 10 * 1000L // 10 seconds
val bus = new LiveListenerBus(new SparkConf(false))
val counter1 = new BasicJobCounter()
val counter2 = new BasicJobCounter()
val interruptingListener = new DelayInterruptingJobCounter(throwInterruptedException, 3)
bus.addToSharedQueue(counter1)
bus.addToSharedQueue(interruptingListener)
bus.addToEventLogQueue(counter2)
assert(bus.activeQueues() === Set(SHARED_QUEUE, EVENT_LOG_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[DelayInterruptingJobCounter]().size === 1)

bus.start(mockSparkContext, mockMetricsSystem)

(0 until 5).foreach { jobId =>
bus.post(SparkListenerJobEnd(jobId, jobCompletionTime, JobSucceeded))
}

// Call bus.stop in a separate thread, otherwise we will block here until bus is stopped
val stoppingThread = new Thread(() => {
bus.stop()
})
stoppingThread.start()
// Notify interrupting listener starts to work
interruptingListener.sleep = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you trying to make sure listeners throw the exception after "stop()" is called? That's going to be hard, and your code isn't really guaranteeing that.

You could use a CountDownLatch that you signal right before calling stop() (in the thread) to unblock the listener; that will at least narrow the race down a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could check the stopped status of bus in the listener.
This would be better than using a CountDownLatch, however, it can't get rid of racing completely. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CountDownLatch always make things deterministic and it sounds better to me.

What do you mean by "it can't get rid of racing completely"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the PR description, to reproduce the original issue, we have to make sure:

  1. Holding the synchronized lock of bus in the stopping thread
  2. Trying to acquire the synchronized lock of bus in the interrupting listener thread

But signal the listener starts to interrupt just before bus.stop by a CountDownLatch can't guarantee this 100%, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, you should insert CountDownLatch after bus.stop?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, checking the stoped status can't guarantee this. It's likely that the bus has already set the stoped status to true, but has not acquired the synchronized lock yet.

IIUC, you want to let interruptingListener start to work once bus has moved to stop status and acquired the synchronized lock, right?

But how can bus acquired the synchronized lock now? This fix has already removed the synchronized lock. The only thing you could do is to check bus status now and I think it's enough.

Copy link
Contributor Author

@wangshuo128 wangshuo128 Dec 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got your point.

Now, there are two things.

  1. Without the fix, how the test would behave.
  2. With the fix, how to make sure that there is no deadlock when a listener is interrupted after bus.stop is called.

For (1), we can't avoid racing without changing the bus.stop code (e.g. add a callback).
For (2), we at least have to expose the internal stoped status of bus, which maybe is not recommended.

So WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only focus on LiveListenerBus may be impossible to workaround the difficulties you mentioned above. Maybe we should move to AsyncEventQueue.

How about this way:

  1. Add a method status() in AsyncEventQueue for testing only;

  2. In interruptingListener, keep checking AsyncEventQueue.status() until it's stopped. So, when AsyncEventQueue is stopped, we're sure that LiveListenerBus has stopped too and acquired the lock(without fix).

WDYT?

Copy link
Contributor Author

@wangshuo128 wangshuo128 Dec 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this would work. In AsyncEventQueue, in fact, there is also a stoped status that we could check.
But associating a listener with its AsyncEventQueue would be another problem we have to resolve. Currently, it's encapsulated by bus.addToXXXQueue inside the bus code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You guys are trying to fabricate a test that will not be testing what the actual code is doing when a real app is running. That's the problem.

To do that you'd need the stop() code in the listener bus to wait holding a lock while the queues are being drained; and one of those queues need to run into the error that causes it to remove a bad listener. That's hard to do without inserting callbacks that don't exist into the code; and adding those callbacks would only be enabling the test, which is why that's questionably.

So you basically need this in the new stop():

def stop() {
  // do some stop stuff here
  testStartCallback()
  // clear the queues here
  testEndCallback()
}

The two callbacks are needed because otherwise there is no guarantee that what the queues do will happen before stop() does its thing.

But really I don't see what really that test would be actually testing now that there is no synchronized block anymore.

Anything you do here without these callbacks will be racy, and thus may not hit the original issue. Also, without the synchronized block, there's nothing to cause a deadlock in the first place, so that's why I said the test isn't that great to begin with.

So I'd avoid trying to create a fancy test that isn't really testing the issue and just adding unneeded hooks into the main code. The current test is ok and as close as you'll get without the above callbacks; so either go with that, or just remove the test.

// Wait for bus to stop
stoppingThread.join(LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS)

// Stopping has been finished
assert(stoppingThread.isAlive === false)
// All queues are removed
assert(bus.activeQueues() === Set.empty)
assert(counter1.count === 5)
assert(counter2.count === 5)
assert(interruptingListener.count === 3)
}
}

test("event queue size can be configued through spark conf") {
// configure the shared queue size to be 1, event log queue size to be 2,
// and listner bus event queue size to be 5
Expand Down Expand Up @@ -627,6 +668,35 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
}
}

/**
* A simple listener that works as follows:
* 1. sleep and wait when `sleep` is true
* 2. when `sleep` is false, start to work:
* if it is interruptOnJobId, interrupt
* else count SparkListenerJobEnd numbers
*/
private class DelayInterruptingJobCounter(
val throwInterruptedException: Boolean,
val interruptOnJobId: Int) extends SparkListener {
@volatile var sleep = true
var count = 0

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
while (sleep) {
Thread.sleep(10)
}
if (interruptOnJobId == jobEnd.jobId) {
if (throwInterruptedException) {
throw new InterruptedException("got interrupted")
} else {
Thread.currentThread().interrupt()
}
} else {
count += 1
}
}
}
}

// These classes can't be declared inside of the SparkListenerSuite class because we don't want
Expand Down