Skip to content

Commit 6de06b0

Browse files
committed
Merge remote-tracking branch 'apache/master' into streaming-web-ui
Conflicts: core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala core/src/main/scala/org/apache/spark/ui/SparkUI.scala core/src/main/scala/org/apache/spark/ui/WebUI.scala core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
2 parents ee6543f + 3bd3129 commit 6de06b0

File tree

101 files changed

+1150
-310
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+1150
-310
lines changed

bin/spark-class

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ DEFAULT_MEM=${SPARK_MEM:-512m}
4747

4848
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
4949

50-
# Add java opts and memory settings for master, worker, executors, and repl.
50+
# Add java opts and memory settings for master, worker, history server, executors, and repl.
5151
case "$1" in
52-
# Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
52+
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
5353
'org.apache.spark.deploy.master.Master')
5454
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
5555
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
@@ -58,6 +58,10 @@ case "$1" in
5858
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
5959
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
6060
;;
61+
'org.apache.spark.deploy.history.HistoryServer')
62+
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
63+
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
64+
;;
6165

6266
# Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
6367
'org.apache.spark.executor.CoarseGrainedExecutorBackend')

bin/spark-class2.cmd

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,17 @@ if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m
4545

4646
set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
4747

48-
rem Add java opts and memory settings for master, worker, executors, and repl.
49-
rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
48+
rem Add java opts and memory settings for master, worker, history server, executors, and repl.
49+
rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
5050
if "%1"=="org.apache.spark.deploy.master.Master" (
5151
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
5252
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
5353
) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
5454
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
5555
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
56+
) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
57+
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
58+
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
5659

5760
rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
5861
) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (

core/src/main/scala/org/apache/spark/Logging.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
2828
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
2929
* logging messages at different levels using methods that only evaluate parameters lazily if the
3030
* log level is enabled.
31-
*
31+
*
3232
* NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility.
3333
* This will likely be changed or removed in future releases.
3434
*/
@@ -60,7 +60,7 @@ trait Logging {
6060
protected def logDebug(msg: => String) {
6161
if (log.isDebugEnabled) log.debug(msg)
6262
}
63-
63+
6464
protected def logTrace(msg: => String) {
6565
if (log.isTraceEnabled) log.trace(msg)
6666
}
@@ -117,10 +117,10 @@ trait Logging {
117117
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
118118
val classLoader = this.getClass.getClassLoader
119119
Option(classLoader.getResource(defaultLogProps)) match {
120-
case Some(url) =>
120+
case Some(url) =>
121121
PropertyConfigurator.configure(url)
122122
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
123-
case None =>
123+
case None =>
124124
System.err.println(s"Spark was unable to load $defaultLogProps")
125125
}
126126
}
@@ -135,4 +135,16 @@ trait Logging {
135135
private object Logging {
136136
@volatile private var initialized = false
137137
val initLock = new Object()
138+
try {
139+
// We use reflection here to handle the case where users remove the
140+
// slf4j-to-jul bridge order to route their logs to JUL.
141+
val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
142+
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
143+
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
144+
if (!installed) {
145+
bridgeClass.getMethod("install").invoke(null)
146+
}
147+
} catch {
148+
case e: ClassNotFoundException => // can't log anything yet so just fail silently
149+
}
138150
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -219,15 +219,12 @@ class SparkContext(config: SparkConf) extends Logging {
219219
private[spark] val eventLogger: Option[EventLoggingListener] = {
220220
if (conf.getBoolean("spark.eventLog.enabled", false)) {
221221
val logger = new EventLoggingListener(appName, conf)
222+
logger.start()
222223
listenerBus.addListener(logger)
223224
Some(logger)
224225
} else None
225226
}
226227

227-
// Information needed to replay logged events, if any
228-
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
229-
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)
230-
231228
// At this point, all relevant SparkListeners have been registered, so begin releasing events
232229
listenerBus.start()
233230

@@ -292,6 +289,7 @@ class SparkContext(config: SparkConf) extends Logging {
292289
cleaner.foreach(_.start())
293290

294291
postEnvironmentUpdate()
292+
postApplicationStart()
295293

296294
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
297295
val hadoopConfiguration: Configuration = {
@@ -777,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging {
777775
listenerBus.addListener(listener)
778776
}
779777

778+
/** The version of Spark on which this application is running. */
779+
def version = SparkContext.SPARK_VERSION
780+
780781
/**
781782
* Return a map from the slave to the max memory available for caching and the remaining
782783
* memory available for caching.
@@ -930,8 +931,8 @@ class SparkContext(config: SparkConf) extends Logging {
930931

931932
/** Shut down the SparkContext. */
932933
def stop() {
934+
postApplicationEnd()
933935
ui.stop()
934-
eventLogger.foreach(_.stop())
935936
// Do this only if not stopped already - best case effort.
936937
// prevent NPE if stopped more than once.
937938
val dagSchedulerCopy = dagScheduler
@@ -940,13 +941,14 @@ class SparkContext(config: SparkConf) extends Logging {
940941
metadataCleaner.cancel()
941942
cleaner.foreach(_.stop())
942943
dagSchedulerCopy.stop()
943-
listenerBus.stop()
944944
taskScheduler = null
945945
// TODO: Cache.stop()?
946946
env.stop()
947947
SparkEnv.set(null)
948948
ShuffleMapTask.clearCache()
949949
ResultTask.clearCache()
950+
listenerBus.stop()
951+
eventLogger.foreach(_.stop())
950952
logInfo("Successfully stopped SparkContext")
951953
} else {
952954
logInfo("SparkContext already stopped")
@@ -1175,6 +1177,20 @@ class SparkContext(config: SparkConf) extends Logging {
11751177
/** Register a new RDD, returning its RDD ID */
11761178
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
11771179

1180+
/** Post the application start event */
1181+
private def postApplicationStart() {
1182+
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
1183+
}
1184+
1185+
/**
1186+
* Post the application end event to all listeners immediately, rather than adding it
1187+
* to the event queue for it to be asynchronously processed eventually. Otherwise, a race
1188+
* condition exists in which the listeners may stop before this event has been propagated.
1189+
*/
1190+
private def postApplicationEnd() {
1191+
listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
1192+
}
1193+
11781194
/** Post the environment update event once the task scheduler is ready */
11791195
private def postEnvironmentUpdate() {
11801196
if (taskScheduler != null) {
@@ -1200,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging {
12001216
*/
12011217
object SparkContext extends Logging {
12021218

1219+
private[spark] val SPARK_VERSION = "1.0.0"
1220+
12031221
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
12041222

12051223
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,14 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import org.apache.spark.scheduler.EventLoggingInfo
21-
2220
private[spark] class ApplicationDescription(
2321
val name: String,
2422
val maxCores: Option[Int],
2523
val memoryPerSlave: Int,
2624
val command: Command,
2725
val sparkHome: Option[String],
2826
var appUiUrl: String,
29-
val eventLogInfo: Option[EventLoggingInfo] = None)
27+
val eventLogDir: Option[String] = None)
3028
extends Serializable {
3129

3230
val user = System.getProperty("user.name", "<unknown>")

0 commit comments

Comments
 (0)