Skip to content

Commit e3e56aa

Browse files
author
liguoqiang
committed
Merge branch 'master' into SPARK-1149
2 parents b0d5c07 + 328c73d commit e3e56aa

File tree

88 files changed

+2595
-389
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+2595
-389
lines changed

assembly/pom.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@
2929
<artifactId>spark-assembly_2.10</artifactId>
3030
<name>Spark Project Assembly</name>
3131
<url>http://spark.apache.org/</url>
32+
<packaging>pom</packaging>
3233

3334
<properties>
34-
<spark.jar>${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</spark.jar>
35+
<spark.jar.dir>scala-${scala.binary.version}</spark.jar.dir>
36+
<spark.jar.basename>${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</spark.jar.basename>
37+
<spark.jar>${project.build.directory}/${spark.jar.dir}/${spark.jar.basename}</spark.jar>
3538
<deb.pkg.name>spark</deb.pkg.name>
3639
<deb.install.path>/usr/share/spark</deb.install.path>
3740
<deb.user>root</deb.user>

assembly/src/main/assembly/assembly.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@
5555
<include>**/*</include>
5656
</includes>
5757
</fileSet>
58+
<fileSet>
59+
<directory>
60+
${project.parent.basedir}/assembly/target/${spark.jar.dir}
61+
</directory>
62+
<outputDirectory>/</outputDirectory>
63+
<includes>
64+
<include>${spark.jar.basename}</include>
65+
</includes>
66+
</fileSet>
5867
</fileSets>
5968

6069
<dependencySets>
@@ -75,6 +84,8 @@
7584
<excludes>
7685
<exclude>org.apache.hadoop:*:jar</exclude>
7786
<exclude>org.apache.spark:*:jar</exclude>
87+
<exclude>org.apache.zookeeper:*:jar</exclude>
88+
<exclude>org.apache.avro:*:jar</exclude>
7889
</excludes>
7990
</dependencySet>
8091
</dependencySets>

core/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,18 @@
6464
<groupId>org.apache.curator</groupId>
6565
<artifactId>curator-recipes</artifactId>
6666
</dependency>
67+
<dependency>
68+
<groupId>org.eclipse.jetty</groupId>
69+
<artifactId>jetty-plus</artifactId>
70+
</dependency>
71+
<dependency>
72+
<groupId>org.eclipse.jetty</groupId>
73+
<artifactId>jetty-security</artifactId>
74+
</dependency>
75+
<dependency>
76+
<groupId>org.eclipse.jetty</groupId>
77+
<artifactId>jetty-util</artifactId>
78+
</dependency>
6779
<dependency>
6880
<groupId>org.eclipse.jetty</groupId>
6981
<artifactId>jetty-server</artifactId>
@@ -118,6 +130,10 @@
118130
<artifactId>chill-java</artifactId>
119131
<version>0.3.1</version>
120132
</dependency>
133+
<dependency>
134+
<groupId>commons-net</groupId>
135+
<artifactId>commons-net</artifactId>
136+
</dependency>
121137
<dependency>
122138
<groupId>${akka.group}</groupId>
123139
<artifactId>akka-remote_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,30 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
7171
val computedValues = rdd.computeOrReadCheckpoint(split, context)
7272
// Persist the result, so long as the task is not running locally
7373
if (context.runningLocally) { return computedValues }
74-
val elements = new ArrayBuffer[Any]
75-
elements ++= computedValues
76-
blockManager.put(key, elements, storageLevel, tellMaster = true)
77-
elements.iterator.asInstanceOf[Iterator[T]]
74+
if (storageLevel.useDisk && !storageLevel.useMemory) {
75+
// In the case that this RDD is to be persisted using DISK_ONLY
76+
// the iterator will be passed directly to the blockManager (rather then
77+
// caching it to an ArrayBuffer first), then the resulting block data iterator
78+
// will be passed back to the user. If the iterator generates a lot of data,
79+
// this means that it doesn't all have to be held in memory at one time.
80+
// This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
81+
// blocks aren't dropped by the block store before enabling that.
82+
blockManager.put(key, computedValues, storageLevel, tellMaster = true)
83+
return blockManager.get(key) match {
84+
case Some(values) =>
85+
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
86+
case None =>
87+
logInfo("Failure to store %s".format(key))
88+
throw new Exception("Block manager failed to return persisted valued")
89+
}
90+
} else {
91+
// In this case the RDD is cached to an array buffer. This will save the results
92+
// if we're dealing with a 'one-time' iterator
93+
val elements = new ArrayBuffer[Any]
94+
elements ++= computedValues
95+
blockManager.put(key, elements, storageLevel, tellMaster = true)
96+
return elements.iterator.asInstanceOf[Iterator[T]]
97+
}
7898
} finally {
7999
loading.synchronized {
80100
loading.remove(key)

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.google.common.io.Files
2323

2424
import org.apache.spark.util.Utils
2525

26-
private[spark] class HttpFileServer extends Logging {
26+
private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
2727

2828
var baseDir : File = null
2929
var fileDir : File = null
@@ -38,9 +38,10 @@ private[spark] class HttpFileServer extends Logging {
3838
fileDir.mkdir()
3939
jarDir.mkdir()
4040
logInfo("HTTP File server directory is " + baseDir)
41-
httpServer = new HttpServer(baseDir)
41+
httpServer = new HttpServer(baseDir, securityManager)
4242
httpServer.start()
4343
serverUri = httpServer.uri
44+
logDebug("HTTP file server started at: " + serverUri)
4445
}
4546

4647
def stop() {

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@ package org.apache.spark
1919

2020
import java.io.File
2121

22+
import org.eclipse.jetty.util.security.{Constraint, Password}
23+
import org.eclipse.jetty.security.authentication.DigestAuthenticator
24+
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
25+
2226
import org.eclipse.jetty.server.Server
2327
import org.eclipse.jetty.server.bio.SocketConnector
24-
import org.eclipse.jetty.server.handler.DefaultHandler
25-
import org.eclipse.jetty.server.handler.HandlerList
26-
import org.eclipse.jetty.server.handler.ResourceHandler
28+
import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, ResourceHandler}
2729
import org.eclipse.jetty.util.thread.QueuedThreadPool
2830

2931
import org.apache.spark.util.Utils
3032

33+
3134
/**
3235
* Exception type thrown by HttpServer when it is in the wrong state for an operation.
3336
*/
@@ -38,7 +41,8 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
3841
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
3942
* around a Jetty server.
4043
*/
41-
private[spark] class HttpServer(resourceBase: File) extends Logging {
44+
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
45+
extends Logging {
4246
private var server: Server = null
4347
private var port: Int = -1
4448

@@ -59,14 +63,60 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
5963
server.setThreadPool(threadPool)
6064
val resHandler = new ResourceHandler
6165
resHandler.setResourceBase(resourceBase.getAbsolutePath)
66+
6267
val handlerList = new HandlerList
6368
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
64-
server.setHandler(handlerList)
69+
70+
if (securityManager.isAuthenticationEnabled()) {
71+
logDebug("HttpServer is using security")
72+
val sh = setupSecurityHandler(securityManager)
73+
// make sure we go through security handler to get resources
74+
sh.setHandler(handlerList)
75+
server.setHandler(sh)
76+
} else {
77+
logDebug("HttpServer is not using security")
78+
server.setHandler(handlerList)
79+
}
80+
6581
server.start()
6682
port = server.getConnectors()(0).getLocalPort()
6783
}
6884
}
6985

86+
/**
87+
* Setup Jetty to the HashLoginService using a single user with our
88+
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
89+
* isn't passed in plaintext.
90+
*/
91+
private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
92+
val constraint = new Constraint()
93+
// use DIGEST-MD5 as the authentication mechanism
94+
constraint.setName(Constraint.__DIGEST_AUTH)
95+
constraint.setRoles(Array("user"))
96+
constraint.setAuthenticate(true)
97+
constraint.setDataConstraint(Constraint.DC_NONE)
98+
99+
val cm = new ConstraintMapping()
100+
cm.setConstraint(constraint)
101+
cm.setPathSpec("/*")
102+
val sh = new ConstraintSecurityHandler()
103+
104+
// the hashLoginService lets us do a single user and
105+
// secret right now. This could be changed to use the
106+
// JAASLoginService for other options.
107+
val hashLogin = new HashLoginService()
108+
109+
val userCred = new Password(securityMgr.getSecretKey())
110+
if (userCred == null) {
111+
throw new Exception("Error: secret key is null with authentication on")
112+
}
113+
hashLogin.putUser(securityMgr.getHttpUser(), userCred, Array("user"))
114+
sh.setLoginService(hashLogin)
115+
sh.setAuthenticator(new DigestAuthenticator());
116+
sh.setConstraintMappings(Array(cm))
117+
sh
118+
}
119+
70120
def stop() {
71121
if (server == null) {
72122
throw new ServerStateException("Server is already stopped")

0 commit comments

Comments
 (0)