Skip to content

Commit f0abf5f

Browse files
kanzhangrxin
authored andcommitted
Fixing a race condition in event listener unit test
Author: Kan Zhang <kzhang@apache.org> Closes #401 from kanzhang/fix-1475 and squashes the following commits: c6058bd [Kan Zhang] Fixing a race condition in event listener unit test (cherry picked from commit 38877cc) Signed-off-by: Reynold Xin <rxin@apache.org>
1 parent e43e31d commit f0abf5f

File tree

2 files changed

+19
-13
lines changed

2 files changed

+19
-13
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
5050
}
5151
}
5252

53-
// Exposed for testing
54-
@volatile private[spark] var stopCalled = false
55-
5653
/**
5754
* Start sending events to attached listeners.
5855
*
@@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
9794
}
9895

9996
def stop() {
100-
stopCalled = true
10197
if (!started) {
10298
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
10399
}

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
7777
test("bus.stop() waits for the event queue to completely drain") {
7878
@volatile var drained = false
7979

80+
// When Listener has started
81+
val listenerStarted = new Semaphore(0)
82+
8083
// Tells the listener to stop blocking
81-
val listenerWait = new Semaphore(1)
84+
val listenerWait = new Semaphore(0)
85+
86+
// When stopper has started
87+
val stopperStarted = new Semaphore(0)
8288

83-
// When stop has returned
84-
val stopReturned = new Semaphore(1)
89+
// When stopper has returned
90+
val stopperReturned = new Semaphore(0)
8591

8692
class BlockingListener extends SparkListener {
8793
override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
94+
listenerStarted.release()
8895
listenerWait.acquire()
8996
drained = true
9097
}
@@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
97104
bus.start()
98105
bus.post(SparkListenerJobEnd(0, JobSucceeded))
99106

100-
// the queue should not drain immediately
107+
listenerStarted.acquire()
108+
// Listener should be blocked after start
101109
assert(!drained)
102110

103111
new Thread("ListenerBusStopper") {
104112
override def run() {
113+
stopperStarted.release()
105114
// stop() will block until notify() is called below
106115
bus.stop()
107-
stopReturned.release(1)
116+
stopperReturned.release()
108117
}
109118
}.start()
110119

111-
while (!bus.stopCalled) {
112-
Thread.sleep(10)
113-
}
120+
stopperStarted.acquire()
121+
// Listener should remain blocked after stopper started
122+
assert(!drained)
114123

124+
// unblock Listener to let queue drain
115125
listenerWait.release()
116-
stopReturned.acquire()
126+
stopperReturned.acquire()
117127
assert(drained)
118128
}
119129

0 commit comments

Comments
 (0)