Skip to content

Commit a95e7c8

Browse files
committed
Merge remote-tracking branch 'upstream/master' into chouqin-dt-preprune
Conflicts: mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
2 parents 95c479d + af25838 commit a95e7c8

File tree

66 files changed

+969
-449
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+969
-449
lines changed

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._
2424
import org.apache.spark._
2525
import org.apache.spark.storage.StorageLevel
2626

27-
import scala.language.postfixOps
28-
2927
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
3028
class TestMessage(val targetId: String) extends Message[String] with Serializable
3129

bin/compute-classpath.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
4343
echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
4444
"classes ahead of assembly." >&2
4545
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
46+
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
4647
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
4748
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
4849
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes"

core/pom.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,33 @@
351351
</execution>
352352
</executions>
353353
</plugin>
354+
<!--
355+
Copy guava to the build directory. This is needed to make the SPARK_PREPEND_CLASSES
356+
option work in compute-classpath.sh, since it would put the non-shaded Spark classes in
357+
the runtime classpath.
358+
-->
359+
<plugin>
360+
<groupId>org.apache.maven.plugins</groupId>
361+
<artifactId>maven-dependency-plugin</artifactId>
362+
<executions>
363+
<execution>
364+
<id>copy-dependencies</id>
365+
<phase>package</phase>
366+
<goals>
367+
<goal>copy-dependencies</goal>
368+
</goals>
369+
<configuration>
370+
<outputDirectory>${project.build.directory}</outputDirectory>
371+
<overWriteReleases>false</overWriteReleases>
372+
<overWriteSnapshots>false</overWriteSnapshots>
373+
<overWriteIfNewer>true</overWriteIfNewer>
374+
<useSubDirectoryPerType>true</useSubDirectoryPerType>
375+
<includeArtifactIds>guava</includeArtifactIds>
376+
<silent>true</silent>
377+
</configuration>
378+
</execution>
379+
</executions>
380+
</plugin>
354381
</plugins>
355382

356383
<resources>

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
162162

163163
// always add the current user and SPARK_USER to the viewAcls
164164
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
165-
Option(System.getenv("SPARK_USER")).getOrElse(""))
165+
Option(System.getenv("SPARK_USER")).getOrElse("")).filter(!_.isEmpty)
166166

167167
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
168168
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging {
220220
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
221221

222222
// Initialize the Spark UI, registering all associated listeners
223-
private[spark] val ui = new SparkUI(this)
224-
ui.bind()
223+
private[spark] val ui: Option[SparkUI] =
224+
if (conf.getBoolean("spark.ui.enabled", true)) {
225+
Some(new SparkUI(this))
226+
} else {
227+
// For tests, do not enable the UI
228+
None
229+
}
230+
ui.foreach(_.bind())
225231

226232
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
227233
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
@@ -990,7 +996,7 @@ class SparkContext(config: SparkConf) extends Logging {
990996
/** Shut down the SparkContext. */
991997
def stop() {
992998
postApplicationEnd()
993-
ui.stop()
999+
ui.foreach(_.stop())
9941000
// Do this only if not stopped already - best case effort.
9951001
// prevent NPE if stopped more than once.
9961002
val dagSchedulerCopy = dagScheduler

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io.{File, PrintStream}
21-
import java.lang.reflect.InvocationTargetException
21+
import java.lang.reflect.{Modifier, InvocationTargetException}
2222
import java.net.URL
2323

2424
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -323,7 +323,9 @@ object SparkSubmit {
323323
}
324324

325325
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
326-
326+
if (!Modifier.isStatic(mainMethod.getModifiers)) {
327+
throw new IllegalStateException("The main method in the given main class must be static")
328+
}
327329
try {
328330
mainMethod.invoke(null, childArgs.toArray)
329331
} catch {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,16 @@ private[spark] class Executor(
360360
if (!taskRunner.attemptedTask.isEmpty) {
361361
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
362362
metrics.updateShuffleReadMetrics
363-
tasksMetrics += ((taskRunner.taskId, metrics))
363+
if (isLocal) {
364+
// JobProgressListener will hold an reference of it during
365+
// onExecutorMetricsUpdate(), then JobProgressListener can not see
366+
// the changes of metrics any more, so make a deep copy of it
367+
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
368+
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
369+
} else {
370+
// It will be copied by serialization
371+
tasksMetrics += ((taskRunner.taskId, metrics))
372+
}
364373
}
365374
}
366375
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
292292
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
293293
conf.set("spark.ui.filters", filterName)
294294
conf.set(s"spark.$filterName.params", filterParams)
295-
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
295+
scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
296296
}
297297
}
298298
}

core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import org.apache.hadoop.conf.Configuration
2120
import org.apache.hadoop.fs.{Path, FileSystem}
2221

2322
import org.apache.spark.{Logging, SparkContext, SparkEnv}
@@ -47,16 +46,17 @@ private[spark] class SimrSchedulerBackend(
4746

4847
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
4948
val fs = FileSystem.get(conf)
49+
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
5050

5151
logInfo("Writing to HDFS file: " + driverFilePath)
5252
logInfo("Writing Akka address: " + driverUrl)
53-
logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
53+
logInfo("Writing Spark UI Address: " + appUIAddress)
5454

5555
// Create temporary file to prevent race condition where executors get empty driverUrl file
5656
val temp = fs.create(tmpPath, true)
5757
temp.writeUTF(driverUrl)
5858
temp.writeInt(maxCores)
59-
temp.writeUTF(sc.ui.appUIAddress)
59+
temp.writeUTF(appUIAddress)
6060
temp.close()
6161

6262
// "Atomic" rename

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ private[spark] class SparkDeploySchedulerBackend(
6767
val javaOpts = sparkJavaOpts ++ extraJavaOpts
6868
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
6969
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
70+
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
71+
val eventLogDir = sc.eventLogger.map(_.logDir)
7072
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
71-
sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
73+
appUIAddress, eventLogDir)
7274

7375
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
7476
client.start()

0 commit comments

Comments
 (0)