Skip to content

[SPARK-15703] [Scheduler][Core][WebUI] Make ListenerBus event queue size configurable #14269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

dhruve
Copy link
Contributor

@dhruve dhruve commented Jul 19, 2016

What changes were proposed in this pull request?

This change adds a new configuration entry to specify the size of the spark listener bus event queue. The value for this config ("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000.

Note:
I haven't currently documented the configuration entry. We can decide whether it would be appropriate to make it a public configuration or keep it as an undocumented one. Refer JIRA for more details.

How was this patch tested?

Ran existing jobs and verified the event queue size with debug logs and from the Spark WebUI Environment tab.

@tgravescs
Copy link
Contributor

Ok to test

@tgravescs
Copy link
Contributor

Jenkins, test this please

private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf.
getInt("spark.scheduler.listenerbus.eventqueue.size", 10000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use a ConfigEntry for this

@SparkQA
Copy link

SparkQA commented Jul 19, 2016

Test build #62548 has finished for PR 14269 at commit 9c0cb44.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

Jenkins, this is okay to test

@tgravescs
Copy link
Contributor

Jenkins, test this please

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.transform((x: Int) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transform wasn't really meant to be a validator. We had discussed adding validator functionality to the Config stuff but I think that should be done separately. So for now lets pull this out and just do the check in the LiveListenerBus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The method is intended for something else. Having the validator functionality here would be good to do so. I will make the changes and fix the tests.

@SparkQA
Copy link

SparkQA commented Jul 20, 2016

Test build #62613 has finished for PR 14269 at commit 76d9af8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

Jenkins, test this please

@dhruve
Copy link
Contributor Author

dhruve commented Jul 21, 2016

Jenkins, okay to test

@dhruve
Copy link
Contributor Author

dhruve commented Jul 22, 2016

@vanzin, @srowen Can you kick of the Jenkins build? Thanks.

@vanzin
Copy link
Contributor

vanzin commented Jul 22, 2016

ok to test

@vanzin
Copy link
Contributor

vanzin commented Jul 22, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Jul 22, 2016

Test build #62733 has finished for PR 14269 at commit 889fe66.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2016

Test build #62732 has finished for PR 14269 at commit 889fe66.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dhruve
Copy link
Contributor Author

dhruve commented Jul 22, 2016

The set of failures from Test build #62733 are unrelated.

@@ -19,6 +19,7 @@ package org.apache.spark.internal

import java.util.concurrent.TimeUnit

import org.apache.spark.SparkException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove I think this is unused now.

@tgravescs
Copy link
Contributor

one minor issue with import otherwise changes LGTM.

@dhruve
Copy link
Contributor Author

dhruve commented Jul 25, 2016

Removed unused import.

@SparkQA
Copy link

SparkQA commented Jul 25, 2016

Test build #62842 has finished for PR 14269 at commit 82feec4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 0b71d9a Jul 26, 2016
asfgit pushed a commit that referenced this pull request Sep 23, 2016
…ze configurable (branch 2.0)

## What changes were proposed in this pull request?

Backport #14269 to 2.0.

## How was this patch tested?

Jenkins.

Author: Dhruve Ashar <dhruveashar@gmail.com>

Closes #15222 from zsxwing/SPARK-15703-2.0.
@@ -32,18 +33,24 @@ import org.apache.spark.util.Utils
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus extends SparkListenerBus {
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm modifying LiveListenerBus now and noticed that we're passing in sparkContext even though we only use it to access conf. I think it would have been better to just pass in conf here. It would make the initialization order constraints a lot clearer, too: right now it's not immediately clear why eventQueue needs to be a lazy val.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we also use this to tear down sparkContext when the listener dies. I still have a weak preference for the old code, however, since I think the lifecycle was clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I recall and as you mentioned, a ref to sparkContext was already being used to tear down when the listener died because of an uncaught Exception. The idea was to refactor the code to make sparkContext available at instantiation time and access the conf from it rather than passing it separately. This defn is cyclic and lazy val was used to ensure that conf was accessed only after sc was intialized.

@@ -47,6 +47,7 @@ class ReceivedBlockHandlerSuite
extends SparkFunSuite
with BeforeAndAfter
with Matchers
with LocalSparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that this change was necessary is another weird code smell which suggests to me that putting SparkContext into the constructor was not a good idea.

@@ -103,4 +103,9 @@ package object config {
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
Copy link
Contributor

@JoshRosen JoshRosen May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another post-hoc review/complaint: I think that size might be misleading in this context where we're talking about a queue, since the size of a queue refers to the number of elements currently in the queue while its capacity refers to the maximum size that the queue can reach. This configuration name caused confusion in https://github.com/apache/spark/pull/18083/files/378206efb9f5c9628a678ba7defb536252f5cbcb#r118413115

Instead, it might have been better to call it capacity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Capacity would have been a better choice.

asfgit pushed a commit that referenced this pull request May 26, 2017
## What changes were proposed in this pull request?

`ConfigBuilder` builds `ConfigEntry` which can only read value with one key, if we wanna change the config name but still keep the old one, it's hard to do.

This PR introduce `ConfigBuilder.withAlternative`, to support reading config value with alternative keys. And also rename `spark.scheduler.listenerbus.eventqueue.size` to `spark.scheduler.listenerbus.eventqueue.capacity` with this feature, according to #14269 (comment)

## How was this patch tested?

a new test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18110 from cloud-fan/config.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants