Skip to content

Commit e09d66e

Browse files
author
Sital Kedia
committed
Support event listener group
1 parent df5b05e commit e09d66e

File tree

5 files changed

+72
-50
lines changed

5 files changed

+72
-50
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
2929
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
3030
import org.apache.spark.metrics.source.Source
3131
import org.apache.spark.scheduler._
32-
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
32+
import org.apache.spark.util.{Clock, ListenerEventExecutor, SystemClock, ThreadUtils, Utils}
3333

3434
/**
3535
* An agent that dynamically allocates and removes executors based on the workload.
@@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager(
217217
* the scheduling task.
218218
*/
219219
def start(): Unit = {
220-
listenerBus.addListener(listener)
220+
listenerBus.addListener(listener, ListenerEventExecutor.ExecutorAllocationManagerGroup)
221221

222222
val scheduleTask = new Runnable() {
223223
override def run(): Unit = {

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
6363
this(sc, new SystemClock)
6464
}
6565

66-
sc.addSparkListener(this)
66+
sc.listenerBus.addListener(this, ListenerEventExecutor.HeartBeatReceiverGroup)
6767

6868
override val rpcEnv: RpcEnv = sc.env.rpcEnv
6969

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ class SparkContext(config: SparkConf) extends Logging {
532532
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
533533
_conf, _hadoopConfiguration)
534534
logger.start()
535-
listenerBus.addListener(logger)
535+
listenerBus.addListener(logger, ListenerEventExecutor.EventLoggingGroup)
536536
Some(logger)
537537
} else {
538538
None
@@ -2329,7 +2329,7 @@ class SparkContext(config: SparkConf) extends Logging {
23292329
" parameter from breaking Spark's ability to find a valid constructor.")
23302330
}
23312331
}
2332-
listenerBus.addListener(listener)
2332+
listenerBus.addListener(listener, ListenerEventExecutor.DefaultUserEventListenerGroup)
23332333
logInfo(s"Registered listener $className")
23342334
}
23352335
} catch {

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ private[spark] object SparkUI {
178178
Utils.getContextOrSparkClassLoader).asScala
179179
listenerFactories.foreach { listenerFactory =>
180180
val listeners = listenerFactory.createListeners(conf, sparkUI)
181-
listeners.foreach(listenerBus.addListener)
181+
listeners.foreach(l => listenerBus.addListener(l))
182182
}
183183
sparkUI
184184
}

core/src/main/scala/org/apache/spark/util/ListenerBus.scala

Lines changed: 66 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
3030
import org.apache.spark.internal.Logging
3131
import org.apache.spark.scheduler.LiveListenerBus
3232

33-
34-
private class ListenerEventExecutor(listenerName: String, queueCapacity: Int) extends Logging {
33+
private class ListenerEventExecutor[L <: AnyRef] (listenerName: String, queueCapacity: Int)
34+
extends Logging {
3535
private val threadFactory = new ThreadFactoryBuilder().setDaemon(true)
3636
.setNameFormat(listenerName + "-event-executor")
3737
.build()
38+
val listeners = new CopyOnWriteArrayList[L]()
3839
/** Holds the events to be processed by this listener. */
3940
private val eventQueue = new LinkedBlockingQueue[Runnable](queueCapacity)
4041
/**
@@ -55,10 +56,18 @@ private class ListenerEventExecutor(listenerName: String, queueCapacity: Int) ex
5556
* guarantee that we do not process any event before starting the event executor.
5657
*/
5758
private val isStarted = new AtomicBoolean(false)
58-
private val lock = new ReentrantLock();
59+
private val lock = new ReentrantLock()
5960
/** Condition variable which is signaled once the event executor is started */
6061
private val startCondition: Condition = lock.newCondition
6162

63+
def addListener(listener: L): Unit = {
64+
listeners.add(listener)
65+
}
66+
67+
def removeListener(listener: L): Unit = {
68+
listeners.remove(listener)
69+
}
70+
6271
def start(): Unit = {
6372
isStarted.set(true)
6473
lock.lock()
@@ -133,7 +142,8 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
133142
// Cap the capacity of the event queue so we get an explicit error (rather than
134143
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
135144
protected def eventQueueSize = 10000
136-
private val listenerAndEventExecutors = new CopyOnWriteArrayList[(L, ListenerEventExecutor)]()
145+
private val eventGroupToEventExecutors =
146+
new ConcurrentHashMap[String, ListenerEventExecutor[L]] ()
137147

138148
// Indicate if `start()` is called
139149
private val started = new AtomicBoolean(false)
@@ -143,11 +153,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
143153
/**
144154
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
145155
*/
146-
final def addListener(listener: L): Unit = {
147-
val eventProcessor = new ListenerEventExecutor(listener.getClass.getName, eventQueueSize)
148-
listenerAndEventExecutors.add((listener, eventProcessor))
156+
final def addListener(
157+
listener: L, eventListenerGroup: String = ListenerEventExecutor.DefaultEventListenerGroup):
158+
Unit = synchronized {
159+
var listenerEventExecutor = eventGroupToEventExecutors.get(eventListenerGroup)
160+
if (listenerEventExecutor == null) {
161+
listenerEventExecutor =
162+
new ListenerEventExecutor[L](listener.getClass.getName, eventQueueSize)
163+
eventGroupToEventExecutors.put(eventListenerGroup, listenerEventExecutor)
164+
165+
}
166+
listenerEventExecutor.addListener(listener)
149167
if (started.get()) {
150-
eventProcessor.start
168+
listenerEventExecutor.start
151169
}
152170
}
153171

@@ -156,14 +174,8 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
156174
* in any thread.
157175
*/
158176
final def removeListener(listener: L): Unit = {
159-
val iter = listenerAndEventExecutors.iterator()
160-
var index = 0
161-
while (iter.hasNext) {
162-
if (iter.next()._1 == listener) {
163-
listenerAndEventExecutors.remove(index)
164-
return
165-
}
166-
index = index + 1
177+
for (eventExecutor <- eventGroupToEventExecutors.values().asScala) {
178+
eventExecutor.removeListener(listener)
167179
}
168180
}
169181

@@ -172,10 +184,8 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
172184
* any of the existing listener
173185
*/
174186
def isListenerBusEmpty: Boolean = {
175-
val iter = listenerAndEventExecutors.iterator()
176-
while (iter.hasNext) {
177-
val listenerEvenProcessor = iter.next._2
178-
if (!listenerEvenProcessor.isEmpty) {
187+
for (eventExecutor <- eventGroupToEventExecutors.values().asScala) {
188+
if (!eventExecutor.isEmpty) {
179189
return false
180190
}
181191
}
@@ -188,19 +198,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
188198
* the {@link ListenerEventExecutor}.
189199
*/
190200
final def postToAll(event: E): Unit = {
191-
// JavaConverters can create a JIterableWrapper if we use asScala.
192-
// However, this method will be called frequently. To avoid the wrapper cost, here we use
193-
// Java Iterator directly.
194-
val iter = listenerAndEventExecutors.iterator()
195-
while (iter.hasNext) {
196-
val item = iter.next()
197-
val listener = item._1
198-
val listenerEventProcessor = item._2
201+
for (listenerEventProcessor <- eventGroupToEventExecutors.values().asScala) {
202+
// JavaConverters can create a JIterableWrapper if we use asScala.
203+
// However, this method will be called frequently. To avoid the wrapper cost, here we use
204+
// Java Iterator directly.
205+
val iter = listenerEventProcessor.listeners.iterator()
206+
while (iter.hasNext) {
207+
val listener = iter.next()
199208
listenerEventProcessor.submit(new Runnable {
200209
override def run(): Unit = Utils.tryLogNonFatalError {
201210
doPostEvent(listener, event)
202211
}
203212
})
213+
}
204214
}
205215
}
206216

@@ -210,15 +220,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
210220
* events.
211221
*/
212222
final def postToAllSync(event: E): Unit = {
213-
val iter = listenerAndEventExecutors.iterator()
214-
while (iter.hasNext) {
215-
val item = iter.next()
216-
val listener = item._1
217-
try {
218-
doPostEvent(listener, event)
219-
} catch {
220-
case NonFatal(e) =>
221-
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
223+
for (listenerEventProcessor <- eventGroupToEventExecutors.values().asScala) {
224+
// JavaConverters can create a JIterableWrapper if we use asScala.
225+
// However, this method will be called frequently. To avoid the wrapper cost, here we use
226+
// Java Iterator directly.
227+
val iter = listenerEventProcessor.listeners.iterator()
228+
while (iter.hasNext) {
229+
val listener = iter.next()
230+
try {
231+
doPostEvent(listener, event)
232+
} catch {
233+
case NonFatal(e) =>
234+
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
235+
}
222236
}
223237
}
224238
}
@@ -231,11 +245,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
231245

232246
private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
233247
val c = implicitly[ClassTag[T]].runtimeClass
234-
listenerAndEventExecutors.asScala.filter(_._1.getClass == c).map(_._1.asInstanceOf[T])
248+
listeners().toSeq.filter(_.getClass == c).map(_.asInstanceOf[T])
235249
}
236250

237251
private[spark] def listeners(): Seq[L] = {
238-
listenerAndEventExecutors.asScala.map(_._1)
252+
eventGroupToEventExecutors.values.asScala.map(l => l.listeners.asScala).flatten.toSeq
239253
}
240254

241255
/**
@@ -250,9 +264,8 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
250264
if (!started.compareAndSet(false, true)) {
251265
throw new IllegalStateException(s" already started!")
252266
}
253-
val iter = listenerAndEventExecutors.iterator()
254-
while (iter.hasNext) {
255-
iter.next()._2.start()
267+
for (eventExecutor <- eventGroupToEventExecutors.values().asScala) {
268+
eventExecutor.start()
256269
}
257270
}
258271

@@ -268,10 +281,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
268281
} else {
269282
// Keep quiet
270283
}
271-
val iter = listenerAndEventExecutors.iterator()
284+
val iter = eventGroupToEventExecutors.values().iterator()
272285
while (iter.hasNext) {
273-
iter.next()._2.stop()
286+
iter.next().stop()
274287
}
275288
}
276289
}
277290

291+
private[spark] object ListenerEventExecutor {
292+
val DefaultEventListenerGroup = "default-event-listener"
293+
val DefaultUserEventListenerGroup = "default-user-event-listener"
294+
val ExecutorAllocationManagerGroup = "executor-allocation-manager-listener"
295+
val HeartBeatReceiverGroup = "heart-beat-receiver-listener"
296+
val EventLoggingGroup = "event-logging-listener"
297+
// Allows for Context to check whether stop() call is made within listener thread
298+
}
299+

0 commit comments

Comments
 (0)