-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-1276] Add a HistoryServer to render persisted UI #204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c086bd5
8aac163
7584418
60bc6d5
5dbfbb4
d5154da
a9eae7e
1b2f391
0670743
81b568b
050419e
e2f4ff9
bc46fc8
a3598de
248cb3d
19e1fb4
6edf052
567474a
2282300
d02dbaa
2dfb494
f7f5bf0
19d5dd0
69d1b41
b158d98
7b7234c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -219,15 +219,12 @@ class SparkContext(config: SparkConf) extends Logging { | |
private[spark] val eventLogger: Option[EventLoggingListener] = { | ||
if (conf.getBoolean("spark.eventLog.enabled", false)) { | ||
val logger = new EventLoggingListener(appName, conf) | ||
logger.start() | ||
listenerBus.addListener(logger) | ||
Some(logger) | ||
} else None | ||
} | ||
|
||
// Information needed to replay logged events, if any | ||
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] = | ||
eventLogger.map { logger => Some(logger.info) }.getOrElse(None) | ||
|
||
// At this point, all relevant SparkListeners have been registered, so begin releasing events | ||
listenerBus.start() | ||
|
||
|
@@ -292,6 +289,7 @@ class SparkContext(config: SparkConf) extends Logging { | |
cleaner.foreach(_.start()) | ||
|
||
postEnvironmentUpdate() | ||
postApplicationStart() | ||
|
||
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ | ||
val hadoopConfiguration: Configuration = { | ||
|
@@ -777,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging { | |
listenerBus.addListener(listener) | ||
} | ||
|
||
/** The version of Spark on which this application is running. */ | ||
def version = SparkContext.SPARK_VERSION | ||
|
||
/** | ||
* Return a map from the slave to the max memory available for caching and the remaining | ||
* memory available for caching. | ||
|
@@ -930,6 +931,7 @@ class SparkContext(config: SparkConf) extends Logging { | |
|
||
/** Shut down the SparkContext. */ | ||
def stop() { | ||
postApplicationEnd() | ||
ui.stop() | ||
// Do this only if not stopped already - best case effort. | ||
// prevent NPE if stopped more than once. | ||
|
@@ -1175,6 +1177,20 @@ class SparkContext(config: SparkConf) extends Logging { | |
/** Register a new RDD, returning its RDD ID */ | ||
private[spark] def newRddId(): Int = nextRddId.getAndIncrement() | ||
|
||
/** Post the application start event */ | ||
private def postApplicationStart() { | ||
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser)) | ||
} | ||
|
||
/** | ||
* Post the application end event to all listeners immediately, rather than adding it | ||
* to the event queue for it to be asynchronously processed eventually. Otherwise, a race | ||
* condition exists in which the listeners may stop before this event has been propagated. | ||
*/ | ||
private def postApplicationEnd() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't this create a different race condition where there are events in the queue that get dropped because of this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example say the user code does this: val x = rdd.count() Couldn't there be a stage completion event that is in the queue and gets skipped here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is different from the Shutdown event. The StageCompletion event will still be processed in the regular listener bus thread. It is true that the ApplicationEnd event may be processed before the StageCompletion event, however. A proper way of dealing with this is in #366, where the ordering is preserved but all events are guaranteed to be processed to completion. |
||
listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis)) | ||
} | ||
|
||
/** Post the environment update event once the task scheduler is ready */ | ||
private def postEnvironmentUpdate() { | ||
if (taskScheduler != null) { | ||
|
@@ -1200,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging { | |
*/ | ||
object SparkContext extends Logging { | ||
|
||
private[spark] val SPARK_VERSION = "1.0.0" | ||
|
||
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" | ||
|
||
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy | ||
|
||
import org.apache.spark.ui.{SparkUI, WebUI} | ||
|
||
private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) { | ||
|
||
/** Attach a SparkUI to this container. Only valid after bind(). */ | ||
def attachUI(ui: SparkUI) { | ||
assert(serverInfo.isDefined, | ||
"%s must be bound to a server before attaching SparkUIs".format(name)) | ||
val rootHandler = serverInfo.get.rootHandler | ||
for (handler <- ui.handlers) { | ||
rootHandler.addHandler(handler) | ||
if (!handler.isStarted) { | ||
handler.start() | ||
} | ||
} | ||
} | ||
|
||
/** Detach a SparkUI from this container. Only valid after bind(). */ | ||
def detachUI(ui: SparkUI) { | ||
assert(serverInfo.isDefined, | ||
"%s must be bound to a server before detaching SparkUIs".format(name)) | ||
val rootHandler = serverInfo.get.rootHandler | ||
for (handler <- ui.handlers) { | ||
if (handler.isStarted) { | ||
handler.stop() | ||
} | ||
rootHandler.removeHandler(handler) | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have a small section of documentation about start/stopping the history server as well as this setting on configuring the memory.