Skip to content

Commit b85b0c9

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 900b586 + 424a86a commit b85b0c9

File tree

12 files changed

+156
-20
lines changed

12 files changed

+156
-20
lines changed

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class LocalSparkCluster(
5959
/* Start the Workers */
6060
for (workerNum <- 1 to numWorkers) {
6161
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
62-
memoryPerWorker, masters, null, Some(workerNum))
62+
memoryPerWorker, masters, null, Some(workerNum), _conf)
6363
workerActorSystems += workerSystem
6464
}
6565

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private[spark] class Master(
9696
val webUi = new MasterWebUI(this, webUiPort)
9797

9898
val masterPublicAddress = {
99-
val envVar = System.getenv("SPARK_PUBLIC_DNS")
99+
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
100100
if (envVar != null) envVar else host
101101
}
102102

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
4444
val workerId: String,
4545
val host: String,
4646
val webUiPort: Int,
47+
val publicAddress: String,
4748
val sparkHome: File,
4849
val executorDir: File,
4950
val workerUrl: String,
@@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
140141
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
141142

142143
// Add webUI log urls
143-
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
144+
val baseUrl =
145+
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
144146
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
145147
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
146148

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private[spark] class Worker(
121121
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
122122

123123
val publicAddress = {
124-
val envVar = System.getenv("SPARK_PUBLIC_DNS")
124+
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
125125
if (envVar != null) envVar else host
126126
}
127127
var webUi: WorkerWebUI = null
@@ -362,7 +362,8 @@ private[spark] class Worker(
362362
self,
363363
workerId,
364364
host,
365-
webUiPort,
365+
webUi.boundPort,
366+
publicAddress,
366367
sparkHome,
367368
executorDir,
368369
akkaUrl,
@@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
538539
memory: Int,
539540
masterUrls: Array[String],
540541
workDir: String,
541-
workerNumber: Option[Int] = None): (ActorSystem, Int) = {
542+
workerNumber: Option[Int] = None,
543+
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
542544

543545
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
544-
val conf = new SparkConf
545546
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
546547
val actorName = "Worker"
547548
val securityMgr = new SecurityManager(conf)

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private[spark] abstract class WebUI(
4747
protected val handlers = ArrayBuffer[ServletContextHandler]()
4848
protected var serverInfo: Option[ServerInfo] = None
4949
protected val localHostName = Utils.localHostName()
50-
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
50+
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
5151
private val className = Utils.getFormattedClassName(this)
5252

5353
def getBasePath: String = basePath

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
119119

120120
def createExecutorRunner(): ExecutorRunner = {
121121
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
122-
new File("sparkHome"), new File("workDir"), "akka://worker",
122+
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
123123
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
124124
}
125125

core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,69 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.net.URL
21+
2022
import scala.collection.mutable
23+
import scala.io.Source
2124

22-
import org.scalatest.{BeforeAndAfter, FunSuite}
25+
import org.scalatest.FunSuite
2326

2427
import org.apache.spark.scheduler.cluster.ExecutorInfo
2528
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
26-
import org.apache.spark.{SparkContext, LocalSparkContext}
29+
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
2730

28-
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
31+
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
2932

3033
/** Length of time to wait while draining listener events. */
31-
val WAIT_TIMEOUT_MILLIS = 10000
34+
private val WAIT_TIMEOUT_MILLIS = 10000
3235

33-
before {
36+
test("verify that correct log urls get propagated from workers") {
3437
sc = new SparkContext("local-cluster[2,1,512]", "test")
38+
39+
val listener = new SaveExecutorInfo
40+
sc.addSparkListener(listener)
41+
42+
// Trigger a job so that executors get added
43+
sc.parallelize(1 to 100, 4).map(_.toString).count()
44+
45+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
46+
listener.addedExecutorInfos.values.foreach { info =>
47+
assert(info.logUrlMap.nonEmpty)
48+
// Browse to each URL to check that it's valid
49+
info.logUrlMap.foreach { case (logType, logUrl) =>
50+
val html = Source.fromURL(logUrl).mkString
51+
assert(html.contains(s"$logType log page"))
52+
}
53+
}
3554
}
3655

37-
test("verify log urls get propagated from workers") {
56+
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
57+
val SPARK_PUBLIC_DNS = "public_dns"
58+
class MySparkConf extends SparkConf(false) {
59+
override def getenv(name: String) = {
60+
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
61+
else super.getenv(name)
62+
}
63+
64+
override def clone: SparkConf = {
65+
new MySparkConf().setAll(getAll)
66+
}
67+
}
68+
val conf = new MySparkConf()
69+
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
70+
3871
val listener = new SaveExecutorInfo
3972
sc.addSparkListener(listener)
4073

41-
val rdd1 = sc.parallelize(1 to 100, 4)
42-
val rdd2 = rdd1.map(_.toString)
43-
rdd2.setName("Target RDD")
44-
rdd2.count()
74+
// Trigger a job so that executors get added
75+
sc.parallelize(1 to 100, 4).map(_.toString).count()
4576

4677
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
4778
listener.addedExecutorInfos.values.foreach { info =>
4879
assert(info.logUrlMap.nonEmpty)
80+
info.logUrlMap.values.foreach { logUrl =>
81+
assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
82+
}
4983
}
5084
}
5185

core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
3333
val appDesc = new ApplicationDescription("app name", Some(8), 500,
3434
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
3535
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
36-
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
36+
"publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
3737
ExecutorState.RUNNING)
3838
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
3939
assert(builder.command().last === appId)

mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.Logging
2222
import org.apache.spark.SparkContext._
2323
import org.apache.spark.mllib.evaluation.binary._
2424
import org.apache.spark.rdd.{RDD, UnionRDD}
25+
import org.apache.spark.sql.DataFrame
2526

2627
/**
2728
* :: Experimental ::
@@ -53,6 +54,13 @@ class BinaryClassificationMetrics(
5354
*/
5455
def this(scoreAndLabels: RDD[(Double, Double)]) = this(scoreAndLabels, 0)
5556

57+
/**
58+
* An auxiliary constructor taking a DataFrame.
59+
* @param scoreAndLabels a DataFrame with two double columns: score and label
60+
*/
61+
private[mllib] def this(scoreAndLabels: DataFrame) =
62+
this(scoreAndLabels.map(r => (r.getDouble(0), r.getDouble(1))))
63+
5664
/** Unpersist intermediate RDDs used in the computation. */
5765
def unpersist() {
5866
cumulativeCounts.unpersist()

python/docs/pyspark.mllib.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ pyspark.mllib.clustering module
1616
:members:
1717
:undoc-members:
1818

19+
pyspark.mllib.evaluation module
20+
-------------------------------
21+
22+
.. automodule:: pyspark.mllib.evaluation
23+
:members:
24+
:undoc-members:
25+
1926
pyspark.mllib.feature module
2027
-------------------------------
2128

0 commit comments

Comments
 (0)