Skip to content

Commit 80b900a

Browse files
author
Marcelo Vanzin
committed
[SPARK-22850][core] Ensure queued events are delivered to all event queues.
The code in LiveListenerBus was queueing events before start in the queues themselves; so in situations like the following: bus.post(someEvent) bus.addToEventLogQueue(listener) bus.start() "someEvent" would not be delivered to "listener" if that was the first listener in the queue. This change buffers the events before starting the bus in the bus itself, so that they can be delivered to all registered queues when the bus is started. Also tweaked the unit tests to cover the behavior above.
1 parent 7570eab commit 80b900a

File tree

2 files changed

+30
-12
lines changed

2 files changed

+30
-12
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
6262

6363
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
6464

65+
// Visible for testing.
66+
private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
67+
6568
/** Add a listener to queue shared by all non-internal listeners. */
6669
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
6770
addToQueue(listener, SHARED_QUEUE)
@@ -124,13 +127,19 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
124127
}
125128

126129
/** Post an event to all queues. */
127-
def post(event: SparkListenerEvent): Unit = {
128-
if (!stopped.get()) {
129-
metrics.numEventsPosted.inc()
130+
def post(event: SparkListenerEvent): Unit = synchronized {
131+
if (stopped.get()) {
132+
return
133+
}
134+
135+
metrics.numEventsPosted.inc()
136+
if (started.get()) {
130137
val it = queues.iterator()
131138
while (it.hasNext()) {
132139
it.next().post(event)
133140
}
141+
} else {
142+
queuedEvents += event
134143
}
135144
}
136145

@@ -149,7 +158,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
149158
}
150159

151160
this.sparkContext = sc
152-
queues.asScala.foreach(_.start(sc))
161+
queues.asScala.foreach { q =>
162+
q.start(sc)
163+
queuedEvents.foreach(q.post)
164+
}
165+
queuedEvents = null
153166
metricsSystem.registerSource(metrics)
154167
}
155168

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
4848
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
4949
}
5050

51-
private def queueSize(bus: LiveListenerBus): Int = {
51+
private def sharedQueueSize(bus: LiveListenerBus): Int = {
5252
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
5353
.asInstanceOf[Int]
5454
}
@@ -73,12 +73,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
7373
val conf = new SparkConf()
7474
val counter = new BasicJobCounter
7575
val bus = new LiveListenerBus(conf)
76-
bus.addToSharedQueue(counter)
7776

7877
// Metrics are initially empty.
7978
assert(bus.metrics.numEventsPosted.getCount === 0)
8079
assert(numDroppedEvents(bus) === 0)
81-
assert(queueSize(bus) === 0)
80+
assert(bus.queuedEvents.size === 0)
8281
assert(eventProcessingTimeCount(bus) === 0)
8382

8483
// Post five events:
@@ -87,17 +86,23 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
8786
// Five messages should be marked as received and queued, but no messages should be posted to
8887
// listeners yet because the the listener bus hasn't been started.
8988
assert(bus.metrics.numEventsPosted.getCount === 5)
90-
assert(queueSize(bus) === 5)
89+
assert(bus.queuedEvents.size === 5)
90+
91+
// Add the counter to the bus after messages have been queued for later delivery.
92+
bus.addToSharedQueue(counter)
9193
assert(counter.count === 0)
9294

9395
// Starting listener bus should flush all buffered events
9496
bus.start(mockSparkContext, mockMetricsSystem)
9597
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
9698
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
9799
assert(counter.count === 5)
98-
assert(queueSize(bus) === 0)
100+
assert(sharedQueueSize(bus) === 0)
99101
assert(eventProcessingTimeCount(bus) === 5)
100102

103+
// After the bus is started, there should be no more queued events.
104+
assert(bus.queuedEvents === null)
105+
101106
// After listener bus has stopped, posting events should not increment counter
102107
bus.stop()
103108
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
@@ -188,18 +193,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
188193
// Post a message to the listener bus and wait for processing to begin:
189194
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
190195
listenerStarted.acquire()
191-
assert(queueSize(bus) === 0)
196+
assert(sharedQueueSize(bus) === 0)
192197
assert(numDroppedEvents(bus) === 0)
193198

194199
// If we post an additional message then it should remain in the queue because the listener is
195200
// busy processing the first event:
196201
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
197-
assert(queueSize(bus) === 1)
202+
assert(sharedQueueSize(bus) === 1)
198203
assert(numDroppedEvents(bus) === 0)
199204

200205
// The queue is now full, so any additional events posted to the listener will be dropped:
201206
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
202-
assert(queueSize(bus) === 1)
207+
assert(sharedQueueSize(bus) === 1)
203208
assert(numDroppedEvents(bus) === 1)
204209

205210
// Allow the the remaining events to be processed so we can stop the listener bus:

0 commit comments

Comments
 (0)