Skip to content

Commit c87e8b6

Browse files
committed
Merge pull request #3 from apache/master
merge lastest spark
2 parents cb1852d + 8767565 commit c87e8b6

File tree

80 files changed

+763
-332
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+763
-332
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>

core/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
@@ -319,6 +319,12 @@
319319
<artifactId>selenium-java</artifactId>
320320
<scope>test</scope>
321321
</dependency>
322+
<!-- Added for selenium: -->
323+
<dependency>
324+
<groupId>xml-apis</groupId>
325+
<artifactId>xml-apis</artifactId>
326+
<scope>test</scope>
327+
</dependency>
322328
<dependency>
323329
<groupId>org.mockito</groupId>
324330
<artifactId>mockito-all</artifactId>

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,27 @@ package org.apache.spark.api.python
1919

2020
import java.io._
2121
import java.net._
22-
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, UUID, Collections}
23-
24-
import org.apache.spark.input.PortableDataStream
22+
import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => JMap}
2523

2624
import scala.collection.JavaConversions._
2725
import scala.collection.mutable
2826
import scala.language.existentials
2927

3028
import com.google.common.base.Charsets.UTF_8
31-
3229
import org.apache.hadoop.conf.Configuration
3330
import org.apache.hadoop.io.compress.CompressionCodec
34-
import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
31+
import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
3532
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}
33+
3634
import org.apache.spark._
37-
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
35+
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
3836
import org.apache.spark.broadcast.Broadcast
37+
import org.apache.spark.input.PortableDataStream
3938
import org.apache.spark.rdd.RDD
4039
import org.apache.spark.util.Utils
4140

41+
import scala.util.control.NonFatal
42+
4243
private[spark] class PythonRDD(
4344
@transient parent: RDD[_],
4445
command: Array[Byte],
@@ -341,21 +342,33 @@ private[spark] object PythonRDD extends Logging {
341342
/**
342343
* Adapter for calling SparkContext#runJob from Python.
343344
*
344-
* This method will return an iterator of an array that contains all elements in the RDD
345+
* This method will serve an iterator of an array that contains all elements in the RDD
345346
* (effectively a collect()), but allows you to run on a certain subset of partitions,
346347
* or to enable local execution.
348+
*
349+
* @return the port number of a local socket which serves the data collected from this job.
347350
*/
348351
def runJob(
349352
sc: SparkContext,
350353
rdd: JavaRDD[Array[Byte]],
351354
partitions: JArrayList[Int],
352-
allowLocal: Boolean): Iterator[Array[Byte]] = {
355+
allowLocal: Boolean): Int = {
353356
type ByteArray = Array[Byte]
354357
type UnrolledPartition = Array[ByteArray]
355358
val allPartitions: Array[UnrolledPartition] =
356359
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
357360
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
358-
flattenedPartition.iterator
361+
serveIterator(flattenedPartition.iterator,
362+
s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}")
363+
}
364+
365+
/**
366+
* A helper function to collect an RDD as an iterator, then serve it via socket.
367+
*
368+
* @return the port number of a local socket which serves the data collected from this job.
369+
*/
370+
def collectAndServe[T](rdd: RDD[T]): Int = {
371+
serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}")
359372
}
360373

361374
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
@@ -575,15 +588,44 @@ private[spark] object PythonRDD extends Logging {
575588
dataOut.write(bytes)
576589
}
577590

578-
def writeToFile[T](items: java.util.Iterator[T], filename: String) {
579-
import scala.collection.JavaConverters._
580-
writeToFile(items.asScala, filename)
581-
}
591+
/**
592+
* Create a socket server and a background thread to serve the data in `items`,
593+
*
594+
* The socket server can only accept one connection, or close if no connection
595+
* in 3 seconds.
596+
*
597+
* Once a connection comes in, it tries to serialize all the data in `items`
598+
* and send them into this connection.
599+
*
600+
* The thread will terminate after all the data are sent or any exceptions happen.
601+
*/
602+
private def serveIterator[T](items: Iterator[T], threadName: String): Int = {
603+
val serverSocket = new ServerSocket(0, 1)
604+
serverSocket.setReuseAddress(true)
605+
// Close the socket if no connection in 3 seconds
606+
serverSocket.setSoTimeout(3000)
607+
608+
new Thread(threadName) {
609+
setDaemon(true)
610+
override def run() {
611+
try {
612+
val sock = serverSocket.accept()
613+
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
614+
try {
615+
writeIteratorToStream(items, out)
616+
} finally {
617+
out.close()
618+
}
619+
} catch {
620+
case NonFatal(e) =>
621+
logError(s"Error while sending iterator", e)
622+
} finally {
623+
serverSocket.close()
624+
}
625+
}
626+
}.start()
582627

583-
def writeToFile[T](items: Iterator[T], filename: String) {
584-
val file = new DataOutputStream(new FileOutputStream(filename))
585-
writeIteratorToStream(items, file)
586-
file.close()
628+
serverSocket.getLocalPort
587629
}
588630

589631
private def getMergedConf(confAsMap: java.util.HashMap[String, String],

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class LocalSparkCluster(
5959
/* Start the Workers */
6060
for (workerNum <- 1 to numWorkers) {
6161
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
62-
memoryPerWorker, masters, null, Some(workerNum))
62+
memoryPerWorker, masters, null, Some(workerNum), _conf)
6363
workerActorSystems += workerSystem
6464
}
6565

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private[spark] class Master(
9696
val webUi = new MasterWebUI(this, webUiPort)
9797

9898
val masterPublicAddress = {
99-
val envVar = System.getenv("SPARK_PUBLIC_DNS")
99+
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
100100
if (envVar != null) envVar else host
101101
}
102102

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
4444
val workerId: String,
4545
val host: String,
4646
val webUiPort: Int,
47+
val publicAddress: String,
4748
val sparkHome: File,
4849
val executorDir: File,
4950
val workerUrl: String,
@@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
140141
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
141142

142143
// Add webUI log urls
143-
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
144+
val baseUrl =
145+
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
144146
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
145147
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
146148

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private[spark] class Worker(
121121
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
122122

123123
val publicAddress = {
124-
val envVar = System.getenv("SPARK_PUBLIC_DNS")
124+
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
125125
if (envVar != null) envVar else host
126126
}
127127
var webUi: WorkerWebUI = null
@@ -362,7 +362,8 @@ private[spark] class Worker(
362362
self,
363363
workerId,
364364
host,
365-
webUiPort,
365+
webUi.boundPort,
366+
publicAddress,
366367
sparkHome,
367368
executorDir,
368369
akkaUrl,
@@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
538539
memory: Int,
539540
masterUrls: Array[String],
540541
workDir: String,
541-
workerNumber: Option[Int] = None): (ActorSystem, Int) = {
542+
workerNumber: Option[Int] = None,
543+
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
542544

543545
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
544-
val conf = new SparkConf
545546
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
546547
val actorName = "Worker"
547548
val securityMgr = new SecurityManager(conf)

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.local
2020
import java.nio.ByteBuffer
2121

2222
import scala.concurrent.duration._
23+
import scala.language.postfixOps
2324

2425
import akka.actor.{Actor, ActorRef, Props}
2526

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private[spark] abstract class WebUI(
4747
protected val handlers = ArrayBuffer[ServletContextHandler]()
4848
protected var serverInfo: Option[ServerInfo] = None
4949
protected val localHostName = Utils.localHostName()
50-
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
50+
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
5151
private val className = Utils.getFormattedClassName(this)
5252

5353
def getBasePath: String = basePath

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
119119

120120
def createExecutorRunner(): ExecutorRunner = {
121121
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
122-
new File("sparkHome"), new File("workDir"), "akka://worker",
122+
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
123123
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
124124
}
125125

core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,69 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.net.URL
21+
2022
import scala.collection.mutable
23+
import scala.io.Source
2124

22-
import org.scalatest.{BeforeAndAfter, FunSuite}
25+
import org.scalatest.FunSuite
2326

2427
import org.apache.spark.scheduler.cluster.ExecutorInfo
2528
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
26-
import org.apache.spark.{SparkContext, LocalSparkContext}
29+
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
2730

28-
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
31+
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
2932

3033
/** Length of time to wait while draining listener events. */
31-
val WAIT_TIMEOUT_MILLIS = 10000
34+
private val WAIT_TIMEOUT_MILLIS = 10000
3235

33-
before {
36+
test("verify that correct log urls get propagated from workers") {
3437
sc = new SparkContext("local-cluster[2,1,512]", "test")
38+
39+
val listener = new SaveExecutorInfo
40+
sc.addSparkListener(listener)
41+
42+
// Trigger a job so that executors get added
43+
sc.parallelize(1 to 100, 4).map(_.toString).count()
44+
45+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
46+
listener.addedExecutorInfos.values.foreach { info =>
47+
assert(info.logUrlMap.nonEmpty)
48+
// Browse to each URL to check that it's valid
49+
info.logUrlMap.foreach { case (logType, logUrl) =>
50+
val html = Source.fromURL(logUrl).mkString
51+
assert(html.contains(s"$logType log page"))
52+
}
53+
}
3554
}
3655

37-
test("verify log urls get propagated from workers") {
56+
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
57+
val SPARK_PUBLIC_DNS = "public_dns"
58+
class MySparkConf extends SparkConf(false) {
59+
override def getenv(name: String) = {
60+
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
61+
else super.getenv(name)
62+
}
63+
64+
override def clone: SparkConf = {
65+
new MySparkConf().setAll(getAll)
66+
}
67+
}
68+
val conf = new MySparkConf()
69+
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
70+
3871
val listener = new SaveExecutorInfo
3972
sc.addSparkListener(listener)
4073

41-
val rdd1 = sc.parallelize(1 to 100, 4)
42-
val rdd2 = rdd1.map(_.toString)
43-
rdd2.setName("Target RDD")
44-
rdd2.count()
74+
// Trigger a job so that executors get added
75+
sc.parallelize(1 to 100, 4).map(_.toString).count()
4576

4677
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
4778
listener.addedExecutorInfos.values.foreach { info =>
4879
assert(info.logUrlMap.nonEmpty)
80+
info.logUrlMap.values.foreach { logUrl =>
81+
assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
82+
}
4983
}
5084
}
5185

core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
3333
val appDesc = new ApplicationDescription("app name", Some(8), 500,
3434
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
3535
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
36-
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
36+
"publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
3737
ExecutorState.RUNNING)
3838
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
3939
assert(builder.command().last === appId)

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,10 +386,11 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
386386
}
387387

388388
test("fetch hcfs dir") {
389-
val sourceDir = Utils.createTempDir()
389+
val tempDir = Utils.createTempDir()
390+
val sourceDir = new File(tempDir, "source-dir")
390391
val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath)
391392
val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
392-
val targetDir = new File(Utils.createTempDir(), "target-dir")
393+
val targetDir = new File(tempDir, "target-dir")
393394
Files.write("some text", sourceFile, UTF_8)
394395

395396
val path = new Path("file://" + sourceDir.getAbsolutePath)
@@ -413,7 +414,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
413414
assert(destInnerFile.isFile())
414415

415416
val filePath = new Path("file://" + sourceFile.getAbsolutePath)
416-
val testFileDir = new File("test-filename")
417+
val testFileDir = new File(tempDir, "test-filename")
417418
val testFileName = "testFName"
418419
val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
419420
Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),

docs/_layouts/global.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
<li><a href="programming-guide.html">Spark Programming Guide</a></li>
7272
<li class="divider"></li>
7373
<li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
74-
<li><a href="sql-programming-guide.html">Spark SQL</a></li>
74+
<li><a href="sql-programming-guide.html">DataFrames and SQL</a></li>
7575
<li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
7676
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
7777
<li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>

docs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ options for deployment:
7474
in all supported languages (Scala, Java, Python)
7575
* Modules built on Spark:
7676
* [Spark Streaming](streaming-programming-guide.html): processing real-time data streams
77-
* [Spark SQL](sql-programming-guide.html): support for structured data and relational queries
77+
* [Spark SQL and DataFrames](sql-programming-guide.html): support for structured data and relational queries
7878
* [MLlib](mllib-guide.html): built-in machine learning library
7979
* [GraphX](graphx-programming-guide.html): Spark's new API for graph processing
8080
* [Bagel (Pregel on Spark)](bagel-programming-guide.html): older, simple graph processing model

0 commit comments

Comments
 (0)