-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-20863] Add metrics/instrumentation to LiveListenerBus #18083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #77277 has finished for PR 18083 at commit
|
Test build #77289 has started for PR 18083 at commit |
Context for review: a large portion of the diff in this patch was undoing changes to the LiveListenerBus constructor and |
Test build #77305 has finished for PR 18083 at commit
|
queueSize | ||
private val eventQueue = { | ||
val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) | ||
require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 0!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this constraint can be put in LISTENER_BUS_EVENT_QUEUE_SIZE
with TypedConfigBuilder.checkValue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I didn't know about that. I'll move it in my next update.
val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) | ||
|
||
/** | ||
* The number of of messages waiting in the queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double of
here
/** | ||
* The number of of messages waiting in the queue. | ||
*/ | ||
val queueSize: Gauge[Int] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this metric? Users can easily get it by looking at the spark.scheduler.listenerbus.eventqueue.size
config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the queue's capacity that's fixed. In https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html size refers to the number of items currently in the queue, whereas capacity refers to the maximum number of items that the queue can hold. I think the spark.scheduler.listenerbus.eventqueue.size
configuration is confusingly named.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see
/** | ||
* The total number of events posted to the LiveListenerBus. This counts the number of times | ||
* that `post()` is called, which might be less than the total number of events processed in | ||
* case events are dropped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the code, we also count dropped events, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I was perhaps wasn't explicit enough in the comment, so I'll reword it or just drop the second confusing half.
Is it clearer if I say
This counts the number of times that
post()
has been called called, not the total number of events that have completed processing.
@@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa | |||
logError(s"$name has already stopped! Dropping event $event") | |||
return | |||
} | |||
metrics.numEventsReceived.inc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we also count dropped events?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. My idea was to have a counter which is incremented whenever an event is received, regardless of how it ends up being processed.
val eventAdded = eventQueue.offer(event) | ||
if (eventAdded) { | ||
eventLock.release() | ||
} else { | ||
onDropEvent(event) | ||
metrics.numDroppedEvents.inc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to move this to onDropEvent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
I think it is quite interesting to have performance counters on the dequeing process in the LiveListenerBus. I am not sure that monitoring (with real metrics) the number of dropped events really worth it. You just want to know if messages have been dropped (and having the number in the log is fine). I am agree that having the number of messages in the queue is important. For the number of processed events, it is, in my mind, as for the number of dropped events not so important. For the execution time of message processing it is very interesting, but not having the by listener or by event type breakdowns (just the global timing) will not allow to do a fine grained analysis and so not to do improvements |
Even if the absolute number of dropped events doesn't matter that much I would still like to have this metric: it's simple to implement and being able to use my existing metrics-based monitoring infrastructure correlate dropped events with other signals can be helpful.
For now my timing is capturing the total time to process each message, counting the time to dequeue plus the aggregate time across all of the listeners. Given the current single-threaded processing strategy this is still a useful signal, even if not quite as useful as per-listener metrics. I agree that per-listener metrics would be more useful, though, so let me see if there's a clean refactoring to get the metrics at the per-listener level.
I considered this and I'll look into it, but it's less of a priority for me given that I'm mostly concerned about perf. bottlenecks in LiveListenerBus event delivery. The other listener busses don't queue/drop events and the two that you mentioned are actually wrapping |
Okay, I took a shot at adding timing metrics on a per-listener-class basis. I'm not sure if my way of integrating these timers is the best, though, so let's hold of on merging this until we've had time to discuss it and do a round of revisions. |
*/ | ||
def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = { | ||
synchronized { | ||
val className = listener.getClass.getName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that users register the same listener twice? Then the class name may not be a good identifier for listeners. I think this is the main problem of having listener-wise metrics, how to identify each listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but my goal with these metrics is to be able to identify which listeners are causing performance problems and for that purpose it's more useful to group listeners by class rather than to instrument individual listeners. Most (all?) of Spark's internal listeners have one instance per driver / SparkContext, so in practice keeping track of stats on a per-instance basis wouldn't actually be a meaningful difference in typical cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update the PR description to discuss this per-listener metric.
Test build #77395 has finished for PR 18083 at commit
|
|
||
/** | ||
* Add a listener to listen events. This method is thread-safe and can be called in any thread. | ||
*/ | ||
final def addListener(listener: L): Unit = { | ||
listeners.add(listener) | ||
listenersPlusTimers.add((listener, createTimer(listener).orNull)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not keeping the option in the collection instead of putting null ?
val listenerAndMaybeTimer = iter.next() | ||
val listener = listenerAndMaybeTimer._1 | ||
val maybeTimer = listenerAndMaybeTimer._2 | ||
var maybeTimerContext = if (maybeTimer != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With an option (instead of null value) it would be much simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is just premature optimization. I'll undo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, there is a cost here: allocating a new Option on every postToAll
is going to create more allocations and method calls. Thus I'm going to leave this unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed ! But you can put the option in the collection listenersPlusTimers (instead of doing a orNull when you create the timer) and so you can use it without having to recreate one each time in the postToAll method
try { | ||
doPostEvent(listener, event) | ||
} catch { | ||
case NonFatal(e) => | ||
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) | ||
} finally { | ||
if (maybeTimerContext != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same. simpler with an option
@@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { | |||
} | |||
} | |||
|
|||
override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = { | |||
if (listener.getClass.getName.startsWith("org.apache.spark")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why creating listener just for "spark" listener ? We may want timings even for "third-party" listeners. It is even more important in my mind, for these listeners because they can be much less optimized and so bring a huge performance penalty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is accounted for in a later commit. All listeners are now captured.
Test build #77758 has finished for PR 18083 at commit
|
Test build #77756 has finished for PR 18083 at commit
|
def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { | ||
synchronized { | ||
val className = cls.getName | ||
val maxTimed = 128 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shrug. Maybe, but note that this would be 128 separate listener classes. Let me put in an undocumented configuration.
import org.apache.spark.internal.Logging | ||
|
||
/** | ||
* An event bus which posts events to its listeners. | ||
*/ | ||
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { | ||
|
||
private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use Option[Timmer]
as value type?
} | ||
|
||
/** | ||
* Remove a listener and it won't receive any events. This method is thread-safe and can be called | ||
* in any thread. | ||
*/ | ||
final def removeListener(listener: L): Unit = { | ||
listeners.remove(listener) | ||
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => | ||
listenersPlusTimers.remove(listenerAndTimer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is a CopyOnWriteArrayList
, shall we just do a filter and create a new array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the only reason that CopyOnWriteArrayList
was used was for thread-safety and fast performance for readers interleaved with very rare mutations / writes. If we were to replace the array list then we'd need to add a synchronized
to guard the listenersPlusTimers
field itself.
Given the workload and access patterns here, I'm not sure that it's worth it to attempt to optimize this removeListener()
method any further.
|
||
// Post a message to the listener bus and wait for processing to begin: | ||
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) | ||
listenerStarted.acquire() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this deterministic that this line will be run before listenerStarted.release()
in onJobEnd
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually the order doesn't matter, if release
is called first, acquire
won't block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fine:
- If this code runs before
listenerStarted.release()
then it will block untillistenerStarted.release()
is hit. - The listener will block in
listenerWait.acquire()
until we calllistenerWait.release()
further down in this method.
This synchronization pattern is already used elsewhere in this suite in https://github.com/JoshRosen/spark/blob/76b669ca6eb35a0cce4291702baa5d1f60adb467/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala#L113
LGTM except one question |
Test build #77821 has finished for PR 18083 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
This patch adds Coda Hale metrics for instrumenting the
LiveListenerBus
in order to track the number of events received, dropped, and processed. In addition, it adds per-SparkListener-subclass timers to track message processing time. This is useful for identifying when slow third-party SparkListeners cause performance bottlenecks.See the new
LiveListenerBusMetrics
for a complete description of the new metrics.How was this patch tested?
New tests in SparkListenerSuite, including a test to ensure proper counting of dropped listener events.