-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-22850][core] Ensure queued events are delivered to all event queues. #20039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,6 +62,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) { | |
|
||
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() | ||
|
||
// Visible for testing. | ||
@volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]() | ||
|
||
/** Add a listener to queue shared by all non-internal listeners. */ | ||
def addToSharedQueue(listener: SparkListenerInterface): Unit = { | ||
addToQueue(listener, SHARED_QUEUE) | ||
|
@@ -125,13 +128,39 @@ private[spark] class LiveListenerBus(conf: SparkConf) { | |
|
||
/** Post an event to all queues. */ | ||
def post(event: SparkListenerEvent): Unit = { | ||
if (!stopped.get()) { | ||
metrics.numEventsPosted.inc() | ||
val it = queues.iterator() | ||
while (it.hasNext()) { | ||
it.next().post(event) | ||
if (stopped.get()) { | ||
return | ||
} | ||
|
||
metrics.numEventsPosted.inc() | ||
|
||
// If the event buffer is null, it means the bus has been started and we can avoid | ||
// synchronization and post events directly to the queues. This should be the most | ||
// common case during the life of the bus. | ||
if (queuedEvents == null) { | ||
postToQueues(event) | ||
return | ||
} | ||
|
||
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread | ||
// calling start() picks up the new event. | ||
synchronized { | ||
if (!started.get()) { | ||
queuedEvents += event | ||
return | ||
} | ||
} | ||
|
||
// If the bus was already started when the check above was made, just post directly to the | ||
// queues. | ||
postToQueues(event) | ||
} | ||
|
||
private def postToQueues(event: SparkListenerEvent): Unit = { | ||
val it = queues.iterator() | ||
while (it.hasNext()) { | ||
it.next().post(event) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -149,7 +178,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) { | |
} | ||
|
||
this.sparkContext = sc | ||
queues.asScala.foreach(_.start(sc)) | ||
queues.asScala.foreach { q => | ||
q.start(sc) | ||
queuedEvents.foreach(q.post) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ummm... In my opinion, exchange these two lines sequence would be better for following the original logic of events buffered before a queue calls start(). So, queuedEvents post to queues first before queues start would be unified logically. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That really does not make any difference in behavior. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if stop() called before all queuedEvents post to AsyncEventQueue?
(the "queued events" mentioned in description above is not equal to "queuedEvents" here.) As queuedEvents "post" before listeners install, so, can they be treated as new events? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LiveListener's stop() is not synchronized completely. And LiveListener's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, but the interleaving you were worried about specifically above isn't possible because of the synchronized blocks. Yes, you could get one line into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, perfect explanation. Thanks. |
||
} | ||
queuedEvents = null | ||
metricsSystem.registerSource(metrics) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if stop() called after the null judge and before postToQueues() call ?
Do you think we should check the stopped.get() in postToQueues()?
like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this would help at all. IIUC, you're saying that its possible that a
post()
and astop()
are racing against each other, and we might post to a queue after its been stopped. But thats OK -- the queues are prepared to deal with this.Its also possible that during that race, the event makes it one queue before that queue is stopped, but to another queue after its stopped. Which means that final event only makes it to some queues. Again, I think that is fine, and your change wouldn't help anyway, that would still be possible.