Skip to content

Fixing a race condition in event listener unit test #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
}

// Exposed for testing
@volatile private[spark] var stopCalled = false

/**
* Start sending events to attached listeners.
*
Expand Down Expand Up @@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}

def stop() {
stopCalled = true
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
test("bus.stop() waits for the event queue to completely drain") {
@volatile var drained = false

// When Listener has started
val listenerStarted = new Semaphore(0)

// Tells the listener to stop blocking
val listenerWait = new Semaphore(1)
val listenerWait = new Semaphore(0)

// When stopper has started
val stopperStarted = new Semaphore(0)

// When stop has returned
val stopReturned = new Semaphore(1)
// When stopper has returned
val stopperReturned = new Semaphore(0)

class BlockingListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
listenerStarted.release()
listenerWait.acquire()
drained = true
}
Expand All @@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
bus.start()
bus.post(SparkListenerJobEnd(0, JobSucceeded))

// the queue should not drain immediately
listenerStarted.acquire()
// Listener should be blocked after start
assert(!drained)

new Thread("ListenerBusStopper") {
override def run() {
stopperStarted.release()
// stop() will block until notify() is called below
bus.stop()
stopReturned.release(1)
stopperReturned.release()
}
}.start()

while (!bus.stopCalled) {
Thread.sleep(10)
}
stopperStarted.acquire()
// Listener should remain blocked after stopper started
assert(!drained)

// unblock Listener to let queue drain
listenerWait.release()
stopReturned.acquire()
stopperReturned.acquire()
assert(drained)
}

Expand Down