-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-20863] Add metrics/instrumentation to LiveListenerBus #18083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
a1fb5a8
WIP
JoshRosen a46c247
Fix test compilation
JoshRosen 378206e
Mock in EventLoggingListenerSuite
JoshRosen 37a1a7d
Address Wenchen's review comments.
JoshRosen 3b713a3
Add per-listener timing statistics.
JoshRosen 60c7448
Protect against registering thousands of listener classes.
JoshRosen dcecdae
Merge remote-tracking branch 'origin/master' into listener-bus-metrics
JoshRosen 4a083de
Minor cleanups.
JoshRosen d1a5e99
Add test for per-listener-class timer; rename method.
JoshRosen b8164b2
Merge remote-tracking branch 'origin/master' into listener-bus-metrics
JoshRosen f36fbaa
Use Option.
JoshRosen 76b669c
Add configuration.
JoshRosen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,29 +23,41 @@ import scala.collection.JavaConverters._ | |
import scala.reflect.ClassTag | ||
import scala.util.control.NonFatal | ||
|
||
import com.codahale.metrics.Timer | ||
|
||
import org.apache.spark.internal.Logging | ||
|
||
/** | ||
* An event bus which posts events to its listeners. | ||
*/ | ||
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { | ||
|
||
private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])] | ||
|
||
// Marked `private[spark]` for access in tests. | ||
private[spark] val listeners = new CopyOnWriteArrayList[L] | ||
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava | ||
|
||
/** | ||
* Returns a CodaHale metrics Timer for measuring the listener's event processing time. | ||
* This method is intended to be overridden by subclasses. | ||
*/ | ||
protected def getTimer(listener: L): Option[Timer] = None | ||
|
||
/** | ||
* Add a listener to listen events. This method is thread-safe and can be called in any thread. | ||
*/ | ||
final def addListener(listener: L): Unit = { | ||
listeners.add(listener) | ||
listenersPlusTimers.add((listener, getTimer(listener))) | ||
} | ||
|
||
/** | ||
* Remove a listener and it won't receive any events. This method is thread-safe and can be called | ||
* in any thread. | ||
*/ | ||
final def removeListener(listener: L): Unit = { | ||
listeners.remove(listener) | ||
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => | ||
listenersPlusTimers.remove(listenerAndTimer) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { | |
// JavaConverters can create a JIterableWrapper if we use asScala. | ||
// However, this method will be called frequently. To avoid the wrapper cost, here we use | ||
// Java Iterator directly. | ||
val iter = listeners.iterator | ||
val iter = listenersPlusTimers.iterator | ||
while (iter.hasNext) { | ||
val listener = iter.next() | ||
val listenerAndMaybeTimer = iter.next() | ||
val listener = listenerAndMaybeTimer._1 | ||
val maybeTimer = listenerAndMaybeTimer._2 | ||
val maybeTimerContext = if (maybeTimer.isDefined) { | ||
maybeTimer.get.time() | ||
} else { | ||
null | ||
} | ||
try { | ||
doPostEvent(listener, event) | ||
} catch { | ||
case NonFatal(e) => | ||
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) | ||
} finally { | ||
if (maybeTimerContext != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. simpler with an option |
||
maybeTimerContext.stop() | ||
} | ||
} | ||
} | ||
} | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is a
CopyOnWriteArrayList
, shall we just do a filter and create a new array?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the only reason that
CopyOnWriteArrayList
was used was for thread-safety and fast performance for readers interleaved with very rare mutations / writes. If we were to replace the array list then we'd need to add asynchronized
to guard thelistenersPlusTimers
field itself.Given the workload and access patterns here, I'm not sure that it's worth it to attempt to optimize this
removeListener()
method any further.