Skip to content

[SPARK-15703] [Scheduler][Core][WebUI] Make ListenerBus event queue size configurable #14269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def isStopped: Boolean = stopped.get()

// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
private[spark] val listenerBus = new LiveListenerBus(this)

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
Expand Down Expand Up @@ -2147,7 +2147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

listenerBus.start(this)
listenerBus.start()
_listenerBusStarted = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ package object config {
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
Copy link
Contributor

@JoshRosen JoshRosen May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another post-hoc review/complaint: I think that size might be misleading in this context where we're talking about a queue, since the size of a queue refers to the number of elements currently in the queue while its capacity refers to the maximum size that the queue can reach. This configuration name caused confusion in https://github.com/apache/spark/pull/18083/files/378206efb9f5c9628a678ba7defb536252f5cbcb#r118413115

Instead, it might have been better to call it capacity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Capacity would have been a better choice.

.intConf
.createWithDefault(10000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean

import scala.util.DynamicVariable

import org.apache.spark.SparkContext
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

/**
Expand All @@ -32,18 +33,24 @@ import org.apache.spark.util.Utils
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus extends SparkListenerBus {
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm modifying LiveListenerBus now and noticed that we're passing in sparkContext even though we only use it to access conf. I think it would have been better to just pass in conf here. It would make the initialization order constraints a lot clearer, too: right now it's not immediately clear why eventQueue needs to be a lazy val.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we also use this to tear down sparkContext when the listener dies. I still have a weak preference for the old code, however, since I think the lifecycle was clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I recall and as you mentioned, a ref to sparkContext was already being used to tear down when the listener died because of an uncaught Exception. The idea was to refactor the code to make sparkContext available at instantiation time and access the conf from it rather than passing it separately. This defn is cyclic and lazy val was used to ensure that conf was accessed only after sc was intialized.


self =>

import LiveListenerBus._

private var sparkContext: SparkContext = null

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
if (queueSize <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
}
queueSize
}

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
Expand Down Expand Up @@ -96,11 +103,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus {
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
* @param sc Used to stop the SparkContext in case the listener thread dies.
*/
def start(sc: SparkContext): Unit = {
def start(): Unit = {
if (started.compareAndSet(false, true)) {
sparkContext = sc
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val listenerBus = new LiveListenerBus(sc)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)

// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
eventLogger.start()
listenerBus.start(sc)
listenerBus.start()
listenerBus.addListener(eventLogger)
listenerBus.postToAll(applicationStart)
listenerBus.postToAll(applicationEnd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val jobCompletionTime = 1421191296660L

test("don't call sc.stop in listener") {
sc = new SparkContext("local", "SparkListenerSuite")
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val listener = new SparkContextStoppingListener(sc)
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
bus.addListener(listener)

// Starting listener bus should flush all buffered events
bus.start(sc)
bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

Expand All @@ -52,16 +52,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}

test("basic creation and shutdown of LiveListenerBus") {
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val counter = new BasicJobCounter
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
bus.addListener(counter)

// Listener bus hasn't started yet, so posting events should not increment counter
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(counter.count === 0)

// Starting listener bus should flush all buffered events
bus.start(sc)
bus.start()
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)

Expand All @@ -72,14 +73,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Listener bus must not be started twice
intercept[IllegalStateException] {
val bus = new LiveListenerBus
bus.start(sc)
bus.start(sc)
val bus = new LiveListenerBus(sc)
bus.start()
bus.start()
}

// ... or stopped before starting
intercept[IllegalStateException] {
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
bus.stop()
}
}
Expand All @@ -106,12 +107,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
drained = true
}
}

val bus = new LiveListenerBus
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val bus = new LiveListenerBus(sc)
val blockingListener = new BlockingListener

bus.addListener(blockingListener)
bus.start(sc)
bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))

listenerStarted.acquire()
Expand Down Expand Up @@ -353,13 +354,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val badListener = new BadListener
val jobCounter1 = new BasicJobCounter
val jobCounter2 = new BasicJobCounter
val bus = new LiveListenerBus
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val bus = new LiveListenerBus(sc)

// Propagate events to bad listener first
bus.addListener(badListener)
bus.addListener(jobCounter1)
bus.addListener(jobCounter2)
bus.start(sc)
bus.start()

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._

/** Testsuite that tests block replication in BlockManager */
class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
class BlockManagerReplicationSuite extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {

private val conf = new SparkConf(false).set("spark.app.id", "test")
private var rpcEnv: RpcEnv = null
Expand Down Expand Up @@ -91,8 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// to make cached peers refresh frequently
conf.set("spark.storage.cachedPeersTtl", "10")

sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(sc))), conf, true)
allStores.clear()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer

class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
with PrivateMethodTester with ResetSystemProperties {
with PrivateMethodTester with LocalSparkContext with ResetSystemProperties {

import BlockManagerSuite._

Expand Down Expand Up @@ -107,8 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)

sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(sc))), conf, true)

val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.spark.ui.storage

import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark._
import org.apache.spark.scheduler._
import org.apache.spark.storage._

/**
* Test various functionality in the StorageListener that supports the StorageTab.
*/
class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
private var bus: LiveListenerBus = _
private var storageStatusListener: StorageStatusListener = _
private var storageListener: StorageListener = _
Expand All @@ -43,8 +42,10 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
private val bm1 = BlockManagerId("big", "dog", 1)

before {
bus = new LiveListenerBus
storageStatusListener = new StorageStatusListener(new SparkConf())
val conf = new SparkConf()
sc = new SparkContext("local", "test", conf)
bus = new LiveListenerBus(sc)
storageStatusListener = new StorageStatusListener(conf)
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ReceivedBlockHandlerSuite
extends SparkFunSuite
with BeforeAndAfter
with Matchers
with LocalSparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that this change was necessary is another weird code smell which suggests to me that putting SparkContext into the constructor was not a good idea.

with Logging {

import WriteAheadLogBasedBlockHandler._
Expand Down Expand Up @@ -77,8 +78,10 @@ class ReceivedBlockHandlerSuite
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)

sc = new SparkContext("local", "test", conf)
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(sc))), conf, true)

storageLevel = StorageLevel.MEMORY_ONLY_SER
blockManager = createBlockManager(blockManagerSize, conf)
Expand Down