Skip to content

Commit 31520eb

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into SPARK-1629
2 parents fcaafd7 + ff5be9a commit 31520eb

File tree

95 files changed

+2100
-978
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+2100
-978
lines changed

bagel/pom.xml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,6 @@
3131
<name>Spark Project Bagel</name>
3232
<url>http://spark.apache.org/</url>
3333

34-
<profiles>
35-
<profile>
36-
<!-- SPARK-1121: SPARK-1121: Adds an explicit dependency on Avro to work around
37-
a Hadoop 0.23.X issue -->
38-
<id>yarn-alpha</id>
39-
<dependencies>
40-
<dependency>
41-
<groupId>org.apache.avro</groupId>
42-
<artifactId>avro</artifactId>
43-
</dependency>
44-
</dependencies>
45-
</profile>
46-
</profiles>
47-
4834
<dependencies>
4935
<dependency>
5036
<groupId>org.apache.spark</groupId>

bin/compute-classpath.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
2828
. $FWDIR/bin/load-spark-env.sh
2929

3030
# Build up classpath
31-
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"
31+
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"
3232

3333
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"
3434

bin/pyspark

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export PYSPARK_PYTHON
4646

4747
# Add the PySpark classes to the Python path:
4848
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
49+
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
4950

5051
# Load the PySpark shell.py script when ./pyspark is used interactively:
5152
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP

bin/pyspark2.cmd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ rem Figure out which Python to use.
4545
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
4646

4747
set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
48+
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
4849

4950
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
5051
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py

bin/spark-class

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,13 @@ case "$1" in
7373
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
7474
;;
7575

76-
# All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
77-
'org.apache.spark.repl.Main')
78-
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
76+
# Spark submit uses SPARK_SUBMIT_OPTS and SPARK_JAVA_OPTS
77+
'org.apache.spark.deploy.SparkSubmit')
78+
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS \
79+
-Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
7980
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
8081
;;
82+
8183
*)
8284
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
8385
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
@@ -98,7 +100,6 @@ fi
98100

99101
# Set JAVA_OPTS to be able to load native libraries and to set heap size
100102
JAVA_OPTS="$OUR_JAVA_OPTS"
101-
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$_SPARK_LIBRARY_PATH"
102103
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
103104
# Load extra JAVA_OPTS from conf/java-opts, if it exists
104105
if [ -e "$FWDIR/conf/java-opts" ] ; then

bin/spark-shell

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#
2121
# Shell script for starting the Spark Shell REPL
2222

23-
args="$@"
2423
cygwin=false
2524
case "`uname`" in
2625
CYGWIN*) cygwin=true;;
@@ -46,12 +45,12 @@ function main(){
4645
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
4746
# (see https://github.com/sbt/sbt/issues/562).
4847
stty -icanon min 1 -echo > /dev/null 2>&1
49-
export SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Djline.terminal=unix"
50-
$FWDIR/bin/spark-submit spark-internal "$args" --class org.apache.spark.repl.Main
48+
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
49+
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
5150
stty icanon echo > /dev/null 2>&1
5251
else
53-
export SPARK_REPL_OPTS
54-
$FWDIR/bin/spark-submit spark-internal "$args" --class org.apache.spark.repl.Main
52+
export SPARK_SUBMIT_OPTS
53+
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
5554
fi
5655
}
5756

@@ -83,7 +82,7 @@ if [[ ! $? ]]; then
8382
saved_stty=""
8483
fi
8584

86-
main
85+
main "$@"
8786

8887
# record the exit status lest it be overwritten:
8988
# then reenable echo and propagate the code.

bin/spark-submit

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ while (($#)); do
2626
elif [ "$1" = "--driver-memory" ]; then
2727
DRIVER_MEMORY=$2
2828
elif [ "$1" = "--driver-library-path" ]; then
29-
export _SPARK_LIBRARY_PATH=$2
29+
export SPARK_SUBMIT_LIBRARY_PATH=$2
3030
elif [ "$1" = "--driver-class-path" ]; then
31-
export SPARK_CLASSPATH="$SPARK_CLASSPATH:$2"
31+
export SPARK_SUBMIT_CLASSPATH=$2
3232
elif [ "$1" = "--driver-java-options" ]; then
33-
export SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $2"
33+
export SPARK_SUBMIT_OPTS=$2
3434
fi
3535
shift
3636
done

core/pom.xml

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,6 @@
3030
<packaging>jar</packaging>
3131
<name>Spark Project Core</name>
3232
<url>http://spark.apache.org/</url>
33-
<!-- SPARK-1121: Adds an explicit dependency on Avro to work around a Hadoop 0.23.X issue -->
34-
<profiles>
35-
<profile>
36-
<id>yarn-alpha</id>
37-
<dependencies>
38-
<dependency>
39-
<groupId>org.apache.avro</groupId>
40-
<artifactId>avro</artifactId>
41-
</dependency>
42-
</dependencies>
43-
</profile>
44-
</profiles>
45-
4633
<dependencies>
4734
<dependency>
4835
<groupId>org.apache.hadoop</groupId>
@@ -147,15 +134,6 @@
147134
<groupId>org.json4s</groupId>
148135
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
149136
<version>3.2.6</version>
150-
<!-- see also exclusion for lift-json; this is necessary since it depends on
151-
scala-library and scalap 2.10.0, but we use 2.10.4, and only override
152-
scala-library -->
153-
<exclusions>
154-
<exclusion>
155-
<groupId>org.scala-lang</groupId>
156-
<artifactId>scalap</artifactId>
157-
</exclusion>
158-
</exclusions>
159137
</dependency>
160138
<dependency>
161139
<groupId>colt</groupId>
@@ -316,6 +294,48 @@
316294
</environmentVariables>
317295
</configuration>
318296
</plugin>
297+
<!-- Unzip py4j so we can include its files in the jar -->
298+
<plugin>
299+
<groupId>org.codehaus.mojo</groupId>
300+
<artifactId>exec-maven-plugin</artifactId>
301+
<version>1.2.1</version>
302+
<executions>
303+
<execution>
304+
<phase>generate-resources</phase>
305+
<goals>
306+
<goal>exec</goal>
307+
</goals>
308+
</execution>
309+
</executions>
310+
<configuration>
311+
<executable>unzip</executable>
312+
<workingDirectory>../python</workingDirectory>
313+
<arguments>
314+
<argument>-o</argument>
315+
<argument>lib/py4j*.zip</argument>
316+
<argument>-d</argument>
317+
<argument>build</argument>
318+
</arguments>
319+
</configuration>
320+
</plugin>
319321
</plugins>
322+
323+
<resources>
324+
<resource>
325+
<directory>src/main/resources</directory>
326+
</resource>
327+
<resource>
328+
<directory>../python</directory>
329+
<includes>
330+
<include>pyspark/*.py</include>
331+
</includes>
332+
</resource>
333+
<resource>
334+
<directory>../python/build</directory>
335+
<includes>
336+
<include>py4j/*.py</include>
337+
</includes>
338+
</resource>
339+
</resources>
320340
</build>
321341
</project>

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.api.java
1919

2020
import java.util.{Comparator, List => JList, Iterator => JIterator}
21-
import java.lang.{Iterable => JIterable}
21+
import java.lang.{Iterable => JIterable, Long => JLong}
2222

2323
import scala.collection.JavaConversions._
2424
import scala.reflect.ClassTag
@@ -264,6 +264,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
264264
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
265265
}
266266

267+
/**
268+
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
269+
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
270+
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
271+
*/
272+
def zipWithUniqueId(): JavaPairRDD[T, JLong] = {
273+
JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]]
274+
}
275+
276+
/**
277+
* Zips this RDD with its element indices. The ordering is first based on the partition index
278+
* and then the ordering of items within each partition. So the first item in the first
279+
* partition gets index 0, and the last item in the last partition receives the largest index.
280+
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
281+
* This method needs to trigger a spark job when this RDD contains more than one partitions.
282+
*/
283+
def zipWithIndex(): JavaPairRDD[T, JLong] = {
284+
JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]]
285+
}
286+
267287
// Actions (launch a job to return a value to the user program)
268288

269289
/**

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
7878
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
7979

8080
// Create and start the worker
81-
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
82-
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
81+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
8382
val workerEnv = pb.environment()
8483
workerEnv.putAll(envVars)
85-
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
86-
workerEnv.put("PYTHONPATH", pythonPath)
8784
val worker = pb.start()
8885

8986
// Redirect the worker's stderr to ours
@@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
154151

155152
try {
156153
// Create and start the daemon
157-
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
158-
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
154+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
159155
val workerEnv = pb.environment()
160156
workerEnv.putAll(envVars)
161-
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
162-
workerEnv.put("PYTHONPATH", pythonPath)
163157
daemon = pb.start()
164158

165159
// Redirect the stderr to ours

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.net.{URI, URL}
2323
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2424

2525
import org.apache.spark.executor.ExecutorURLClassLoader
26+
import org.apache.spark.util.Utils
2627

2728
/**
2829
* Scala code behind the spark-submit script. The script handles setting up the classpath with
@@ -128,6 +129,18 @@ object SparkSubmit {
128129
childArgs += ("--class", appArgs.mainClass)
129130
}
130131

132+
if (clusterManager == YARN) {
133+
// The choice of class is arbitrary, could use any spark-yarn class
134+
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
135+
val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " +
136+
"with YARN support."
137+
throw new Exception(msg)
138+
}
139+
}
140+
141+
// Special flag to avoid deprecation warnings at the client
142+
sysProps("SPARK_SUBMIT") = "true"
143+
131144
val options = List[OptionAssigner](
132145
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
133146
new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
298298
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
299299
| --driver-java-options Extra Java options to pass to the driver
300300
| --driver-library-path Extra library path entries to pass to the driver
301-
| --driver-class-path Extra class path entries to pass to the driver
301+
| --driver-class-path Extra class path entries to pass to the driver. Note that
302+
| jars added with --jars are automatically included in the
303+
| classpath.
302304
|
303305
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
304306
|

core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ object CommandUtils extends Logging {
4848
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
4949
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
5050
// Note, this will coalesce multiple options into a single command component
51-
val extraOpts = command.extraJavaOptions.toSeq
51+
val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())
52+
5253
val libraryOpts =
5354
if (command.libraryPathEntries.size > 0) {
5455
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
@@ -62,10 +63,10 @@ object CommandUtils extends Logging {
6263
val classPath = Utils.executeAndGetOutput(
6364
Seq(sparkHome + "/bin/compute-classpath" + ext),
6465
extraEnvironment=command.environment)
65-
val userClassPath = command.classPathEntries.mkString(File.pathSeparator)
66-
val classPathWithUser = classPath + File.pathSeparator + userClassPath
66+
val userClassPath = command.classPathEntries ++ Seq(classPath)
6767

68-
Seq("-cp", classPathWithUser) ++ libraryOpts ++ extraOpts ++ memoryOpts
68+
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
69+
libraryOpts ++ extraOpts ++ memoryOpts
6970
}
7071

7172
/** Spawn a thread that will redirect a given stream to a file */

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
2222
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
2323

2424
/**
25-
* Command-line parser for the master.
25+
* Command-line parser for the worker.
2626
*/
2727
private[spark] class WorkerArguments(args: Array[String]) {
2828
var host = Utils.localHostName()

core/src/main/scala/org/apache/spark/network/SecurityMessage.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ private[spark] class SecurityMessage() extends Logging {
106106
* @return BufferMessage
107107
*/
108108
def toBufferMessage: BufferMessage = {
109-
val startTime = System.currentTimeMillis
110109
val buffers = new ArrayBuffer[ByteBuffer]()
111110

112111
// 4 bytes for the length of the connectionId

core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Doub
6161
} else if (outputsMerged == 0) {
6262
new HashMap[T, BoundedDouble]
6363
} else {
64-
val p = outputsMerged.toDouble / totalOutputs
6564
val studentTCacher = new StudentTCacher(confidence)
6665
val result = new JHashMap[T, BoundedDouble](sums.size)
6766
val iter = sums.entrySet.iterator()

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.collection.mutable
2121

2222
import org.apache.hadoop.conf.Configuration
2323
import org.apache.hadoop.fs.{FileSystem, Path}
24+
import org.apache.hadoop.fs.permission.FsPermission
2425
import org.json4s.jackson.JsonMethods._
2526

2627
import org.apache.spark.{Logging, SparkConf, SparkContext}
@@ -54,7 +55,7 @@ private[spark] class EventLoggingListener(
5455

5556
private val logger =
5657
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
57-
shouldOverwrite)
58+
shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
5859

5960
/**
6061
* Begin logging events.
@@ -124,6 +125,9 @@ private[spark] object EventLoggingListener extends Logging {
124125
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
125126
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
126127
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
128+
val LOG_FILE_PERMISSIONS: FsPermission =
129+
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
130+
127131

128132
// A cache for compression codecs to avoid creating the same codec many times
129133
private val codecMap = new mutable.HashMap[String, CompressionCodec]

0 commit comments

Comments
 (0)