Skip to content

[SPARK-20863] Add metrics/instrumentation to LiveListenerBus #18083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from

Conversation

JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented May 24, 2017

What changes were proposed in this pull request?

This patch adds Coda Hale metrics for instrumenting the LiveListenerBus in order to track the number of events received, dropped, and processed. In addition, it adds per-SparkListener-subclass timers to track message processing time. This is useful for identifying when slow third-party SparkListeners cause performance bottlenecks.

See the new LiveListenerBusMetrics for a complete description of the new metrics.

How was this patch tested?

New tests in SparkListenerSuite, including a test to ensure proper counting of dropped listener events.

@SparkQA
Copy link

SparkQA commented May 24, 2017

Test build #77277 has finished for PR 18083 at commit a1fb5a8.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 24, 2017

Test build #77289 has started for PR 18083 at commit a46c247.

@JoshRosen
Copy link
Contributor Author

Context for review: a large portion of the diff in this patch was undoing changes to the LiveListenerBus constructor and start() method which were introduced in #14269. That patch introduced a bunch of weird implicit initialization order constraints in the form of lazy vals which was complicating using those values in metrics gauges. See my comments over on that other PR for more details.

@SparkQA
Copy link

SparkQA commented May 24, 2017

Test build #77305 has finished for PR 18083 at commit 378206e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

queueSize
private val eventQueue = {
val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 0!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this constraint can be put in LISTENER_BUS_EVENT_QUEUE_SIZE with TypedConfigBuilder.checkValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I didn't know about that. I'll move it in my next update.

val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))

/**
* The number of of messages waiting in the queue.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double of here

/**
* The number of of messages waiting in the queue.
*/
val queueSize: Gauge[Int] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this metric? Users can easily get it by looking at the spark.scheduler.listenerbus.eventqueue.size config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the queue's capacity that's fixed. In https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html size refers to the number of items currently in the queue, whereas capacity refers to the maximum number of items that the queue can hold. I think the spark.scheduler.listenerbus.eventqueue.size configuration is confusingly named.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see

/**
* The total number of events posted to the LiveListenerBus. This counts the number of times
* that `post()` is called, which might be less than the total number of events processed in
* case events are dropped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the code, we also count dropped events, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I was perhaps wasn't explicit enough in the comment, so I'll reword it or just drop the second confusing half.

Is it clearer if I say

This counts the number of times that post() has been called called, not the total number of events that have completed processing.

@@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
logError(s"$name has already stopped! Dropping event $event")
return
}
metrics.numEventsReceived.inc()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we also count dropped events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. My idea was to have a counter which is incremented whenever an event is received, regardless of how it ends up being processed.

val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
metrics.numDroppedEvents.inc()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to move this to onDropEvent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

@bOOm-X
Copy link

bOOm-X commented May 25, 2017

I think it is quite interesting to have performance counters on the dequeing process in the LiveListenerBus.

I am not sure that monitoring (with real metrics) the number of dropped events really worth it. You just want to know if messages have been dropped (and having the number in the log is fine).

I am agree that having the number of messages in the queue is important.

For the number of processed events, it is, in my mind, as for the number of dropped events not so important.

For the execution time of message processing it is very interesting, but not having the by listener or by event type breakdowns (just the global timing) will not allow to do a fine grained analysis and so not to do improvements
I think that it will be better to have this timings for each listener individually (More than the dequeing process itself, the performance improvements will be achieved at the listener level).
So putting the counters in ListenerBus is more appropriate for me. This will allows to not only monitor the LiveListenerBus, but the other one too (like: StreamingQueryListenerBus, StreamingListenerBus, ...)

@JoshRosen
Copy link
Contributor Author

I am not sure that monitoring (with real metrics) the number of dropped events really worth it. You just want to know if messages have been dropped (and having the number in the log is fine).

Even if the absolute number of dropped events doesn't matter that much I would still like to have this metric: it's simple to implement and being able to use my existing metrics-based monitoring infrastructure correlate dropped events with other signals can be helpful.

For the execution time of message processing it is very interesting, but not having the by listener or by event type breakdowns (just the global timing) will not allow to do a fine grained analysis and so not to do improvements.

For now my timing is capturing the total time to process each message, counting the time to dequeue plus the aggregate time across all of the listeners. Given the current single-threaded processing strategy this is still a useful signal, even if not quite as useful as per-listener metrics. I agree that per-listener metrics would be more useful, though, so let me see if there's a clean refactoring to get the metrics at the per-listener level.

So putting the counters in ListenerBus is more appropriate for me. This will allows to not only monitor the LiveListenerBus, but the other one too (like: StreamingQueryListenerBus, StreamingListenerBus, ...)

I considered this and I'll look into it, but it's less of a priority for me given that I'm mostly concerned about perf. bottlenecks in LiveListenerBus event delivery. The other listener busses don't queue/drop events and the two that you mentioned are actually wrapping LiveListenerBus and are both listener bus implementations as well as listeners themselves. Thus my cop-out suggestion is going to be to deal with those in a followup PR.

@JoshRosen
Copy link
Contributor Author

Okay, I took a shot at adding timing metrics on a per-listener-class basis. I'm not sure if my way of integrating these timers is the best, though, so let's hold of on merging this until we've had time to discuss it and do a round of revisions.

*/
def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = {
synchronized {
val className = listener.getClass.getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that users register the same listener twice? Then the class name may not be a good identifier for listeners. I think this is the main problem of having listener-wise metrics, how to identify each listener?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but my goal with these metrics is to be able to identify which listeners are causing performance problems and for that purpose it's more useful to group listeners by class rather than to instrument individual listeners. Most (all?) of Spark's internal listeners have one instance per driver / SparkContext, so in practice keeping track of stats on a per-instance basis wouldn't actually be a meaningful difference in typical cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the PR description to discuss this per-listener metric.

@SparkQA
Copy link

SparkQA commented May 26, 2017

Test build #77395 has finished for PR 18083 at commit 60c7448.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging
  • logError(s\"Not measuring processing time for listener class $className because a \" +


/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listeners.add(listener)
listenersPlusTimers.add((listener, createTimer(listener).orNull))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keeping the option in the collection instead of putting null ?

val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
var maybeTimerContext = if (maybeTimer != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With an option (instead of null value) it would be much simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is just premature optimization. I'll undo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, there is a cost here: allocating a new Option on every postToAll is going to create more allocations and method calls. Thus I'm going to leave this unchanged.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed ! But you can put the option in the collection listenersPlusTimers (instead of doing a orNull when you create the timer) and so you can use it without having to recreate one each time in the postToAll method

try {
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. simpler with an option

@@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
}
}

override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = {
if (listener.getClass.getName.startsWith("org.apache.spark")) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why creating listener just for "spark" listener ? We may want timings even for "third-party" listeners. It is even more important in my mind, for these listeners because they can be much less optimized and so bring a huge performance penalty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is accounted for in a later commit. All listeners are now captured.

@SparkQA
Copy link

SparkQA commented Jun 5, 2017

Test build #77758 has finished for PR 18083 at commit d1a5e99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 5, 2017

Test build #77756 has finished for PR 18083 at commit 4a083de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = {
synchronized {
val className = cls.getName
val maxTimed = 128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shrug. Maybe, but note that this would be 128 separate listener classes. Let me put in an undocumented configuration.

import org.apache.spark.internal.Logging

/**
* An event bus which posts events to its listeners.
*/
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use Option[Timmer] as value type?

}

/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
*/
final def removeListener(listener: L): Unit = {
listeners.remove(listener)
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
listenersPlusTimers.remove(listenerAndTimer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a CopyOnWriteArrayList, shall we just do a filter and create a new array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only reason that CopyOnWriteArrayList was used was for thread-safety and fast performance for readers interleaved with very rare mutations / writes. If we were to replace the array list then we'd need to add a synchronized to guard the listenersPlusTimers field itself.

Given the workload and access patterns here, I'm not sure that it's worth it to attempt to optimize this removeListener() method any further.


// Post a message to the listener bus and wait for processing to begin:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this deterministic that this line will be run before listenerStarted.release() in onJobEnd?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the order doesn't matter, if release is called first, acquire won't block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine:

  • If this code runs before listenerStarted.release() then it will block until listenerStarted.release() is hit.
  • The listener will block in listenerWait.acquire() until we call listenerWait.release() further down in this method.

This synchronization pattern is already used elsewhere in this suite in https://github.com/JoshRosen/spark/blob/76b669ca6eb35a0cce4291702baa5d1f60adb467/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala#L113

@cloud-fan
Copy link
Contributor

LGTM except one question

@SparkQA
Copy link

SparkQA commented Jun 9, 2017

Test build #77821 has finished for PR 18083 at commit 76b669c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 2a23cdd Jun 9, 2017
@JoshRosen JoshRosen deleted the listener-bus-metrics branch June 9, 2017 01:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants