-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-15703] [Scheduler][Core][WebUI] Make ListenerBus event queue size configurable #14269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Ok to test |
Jenkins, test this please |
private val EVENT_QUEUE_CAPACITY = 10000 | ||
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) | ||
private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf. | ||
getInt("spark.scheduler.listenerbus.eventqueue.size", 10000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should use a ConfigEntry for this
Test build #62548 has finished for PR 14269 at commit
|
Jenkins, this is okay to test |
Jenkins, test this please |
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = | ||
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") | ||
.intConf | ||
.transform((x: Int) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transform wasn't really meant to be a validator. We had discussed adding validator functionality to the Config stuff but I think that should be done separately. So for now lets pull this out and just do the check in the LiveListenerBus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. The method is intended for something else. Having the validator functionality here would be good to do so. I will make the changes and fix the tests.
Test build #62613 has finished for PR 14269 at commit
|
Jenkins, test this please |
Jenkins, okay to test |
ok to test |
retest this please |
Test build #62733 has finished for PR 14269 at commit
|
Test build #62732 has finished for PR 14269 at commit
|
The set of failures from Test build #62733 are unrelated. |
@@ -19,6 +19,7 @@ package org.apache.spark.internal | |||
|
|||
import java.util.concurrent.TimeUnit | |||
|
|||
import org.apache.spark.SparkException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove I think this is unused now.
one minor issue with import otherwise changes LGTM. |
Removed unused import. |
Test build #62842 has finished for PR 14269 at commit
|
…ze configurable (branch 2.0) ## What changes were proposed in this pull request? Backport #14269 to 2.0. ## How was this patch tested? Jenkins. Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #15222 from zsxwing/SPARK-15703-2.0.
@@ -32,18 +33,24 @@ import org.apache.spark.util.Utils | |||
* has started will events be actually propagated to all attached listeners. This listener bus | |||
* is stopped when `stop()` is called, and it will drop further events after stopping. | |||
*/ | |||
private[spark] class LiveListenerBus extends SparkListenerBus { | |||
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm modifying LiveListenerBus now and noticed that we're passing in sparkContext
even though we only use it to access conf
. I think it would have been better to just pass in conf
here. It would make the initialization order constraints a lot clearer, too: right now it's not immediately clear why eventQueue
needs to be a lazy val
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we also use this to tear down sparkContext
when the listener dies. I still have a weak preference for the old code, however, since I think the lifecycle was clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I recall and as you mentioned, a ref to sparkContext
was already being used to tear down when the listener died because of an uncaught Exception. The idea was to refactor the code to make sparkContext
available at instantiation time and access the conf from it rather than passing it separately. This defn is cyclic and lazy val
was used to ensure that conf was accessed only after sc was intialized.
@@ -47,6 +47,7 @@ class ReceivedBlockHandlerSuite | |||
extends SparkFunSuite | |||
with BeforeAndAfter | |||
with Matchers | |||
with LocalSparkContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that this change was necessary is another weird code smell which suggests to me that putting SparkContext into the constructor was not a good idea.
@@ -103,4 +103,9 @@ package object config { | |||
.stringConf | |||
.checkValues(Set("hive", "in-memory")) | |||
.createWithDefault("in-memory") | |||
|
|||
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = | |||
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another post-hoc review/complaint: I think that size
might be misleading in this context where we're talking about a queue, since the size of a queue refers to the number of elements currently in the queue while its capacity refers to the maximum size that the queue can reach. This configuration name caused confusion in https://github.com/apache/spark/pull/18083/files/378206efb9f5c9628a678ba7defb536252f5cbcb#r118413115
Instead, it might have been better to call it capacity
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Capacity would have been a better choice.
## What changes were proposed in this pull request? `ConfigBuilder` builds `ConfigEntry` which can only read value with one key, if we wanna change the config name but still keep the old one, it's hard to do. This PR introduce `ConfigBuilder.withAlternative`, to support reading config value with alternative keys. And also rename `spark.scheduler.listenerbus.eventqueue.size` to `spark.scheduler.listenerbus.eventqueue.capacity` with this feature, according to #14269 (comment) ## How was this patch tested? a new test Author: Wenchen Fan <wenchen@databricks.com> Closes #18110 from cloud-fan/config.
What changes were proposed in this pull request?
This change adds a new configuration entry to specify the size of the spark listener bus event queue. The value for this config ("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000.
Note:
I haven't currently documented the configuration entry. We can decide whether it would be appropriate to make it a public configuration or keep it as an undocumented one. Refer JIRA for more details.
How was this patch tested?
Ran existing jobs and verified the event queue size with debug logs and from the Spark WebUI Environment tab.