Skip to content

Commit ab71f7f

Browse files
committed
Merge remote-tracking branch 'apache-github/master' into doc-fix
Conflicts: docs/streaming-programming-guide.md
2 parents 8d6ff9b + 3292e2a commit ab71f7f

File tree

165 files changed

+1725
-355
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

165 files changed

+1725
-355
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,9 @@ unit-tests.log
4949
/lib/
5050
rat-results.txt
5151
scalastyle.txt
52+
53+
# For Hive
54+
metastore_db/
55+
metastore/
56+
warehouse/
57+
TempStatsStore/

bin/compute-classpath.sh

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"
3232

3333
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"
3434

35+
if [ -n "${JAVA_HOME}" ]; then
36+
JAR_CMD="${JAVA_HOME}/bin/jar"
37+
else
38+
JAR_CMD="jar"
39+
fi
40+
3541
# First check if we have a dependencies jar. If so, include binary classes with the deps jar
3642
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
3743
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
@@ -44,6 +50,7 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
4450
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
4551
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
4652
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
53+
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
4754

4855
DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar`
4956
CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR"
@@ -54,6 +61,14 @@ else
5461
else
5562
ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*.jar`
5663
fi
64+
jar_error_check=$($JAR_CMD -tf $ASSEMBLY_JAR org/apache/spark/SparkContext 2>&1)
65+
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
66+
echo "Loading Spark jar with '$JAR_CMD' failed. "
67+
echo "This is likely because Spark was compiled with Java 7 and run "
68+
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark "
69+
echo "or build Spark with Java 6."
70+
exit 1
71+
fi
5772
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
5873
fi
5974

bin/pyspark

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export PYSPARK_PYTHON
4646

4747
# Add the PySpark classes to the Python path:
4848
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
49+
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
4950

5051
# Load the PySpark shell.py script when ./pyspark is used interactively:
5152
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP

bin/pyspark2.cmd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ rem Figure out which Python to use.
4545
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
4646

4747
set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
48+
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
4849

4950
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
5051
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py

bin/spark-class

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,14 @@ if [ -e "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar ]; then
138138
fi
139139

140140
# Compute classpath using external script
141-
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
141+
classpath_output=$($FWDIR/bin/compute-classpath.sh)
142+
if [[ "$?" != "0" ]]; then
143+
echo "$classpath_output"
144+
exit 1
145+
else
146+
CLASSPATH=$classpath_output
147+
fi
148+
142149
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
143150
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
144151
fi

bin/spark-submit

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#
1919

2020
export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
21-
ORIG_ARGS=$@
21+
ORIG_ARGS=("$@")
2222

2323
while (($#)); do
2424
if [ "$1" = "--deploy-mode" ]; then
@@ -39,5 +39,5 @@ if [ ! -z $DRIVER_MEMORY ] && [ ! -z $DEPLOY_MODE ] && [ $DEPLOY_MODE = "client"
3939
export SPARK_MEM=$DRIVER_MEMORY
4040
fi
4141

42-
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit $ORIG_ARGS
42+
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
4343

core/pom.xml

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,6 @@
3838
<dependency>
3939
<groupId>net.java.dev.jets3t</groupId>
4040
<artifactId>jets3t</artifactId>
41-
<exclusions>
42-
<exclusion>
43-
<groupId>commons-logging</groupId>
44-
<artifactId>commons-logging</artifactId>
45-
</exclusion>
46-
</exclusions>
4741
</dependency>
4842
<dependency>
4943
<groupId>org.apache.curator</groupId>
@@ -69,6 +63,10 @@
6963
<groupId>com.google.guava</groupId>
7064
<artifactId>guava</artifactId>
7165
</dependency>
66+
<dependency>
67+
<groupId>org.apache.commons</groupId>
68+
<artifactId>commons-lang3</artifactId>
69+
</dependency>
7270
<dependency>
7371
<groupId>com.google.code.findbugs</groupId>
7472
<artifactId>jsr305</artifactId>
@@ -294,6 +292,48 @@
294292
</environmentVariables>
295293
</configuration>
296294
</plugin>
295+
<!-- Unzip py4j so we can include its files in the jar -->
296+
<plugin>
297+
<groupId>org.codehaus.mojo</groupId>
298+
<artifactId>exec-maven-plugin</artifactId>
299+
<version>1.2.1</version>
300+
<executions>
301+
<execution>
302+
<phase>generate-resources</phase>
303+
<goals>
304+
<goal>exec</goal>
305+
</goals>
306+
</execution>
307+
</executions>
308+
<configuration>
309+
<executable>unzip</executable>
310+
<workingDirectory>../python</workingDirectory>
311+
<arguments>
312+
<argument>-o</argument>
313+
<argument>lib/py4j*.zip</argument>
314+
<argument>-d</argument>
315+
<argument>build</argument>
316+
</arguments>
317+
</configuration>
318+
</plugin>
297319
</plugins>
320+
321+
<resources>
322+
<resource>
323+
<directory>src/main/resources</directory>
324+
</resource>
325+
<resource>
326+
<directory>../python</directory>
327+
<includes>
328+
<include>pyspark/*.py</include>
329+
</includes>
330+
</resource>
331+
<resource>
332+
<directory>../python/build</directory>
333+
<includes>
334+
<include>py4j/*.py</include>
335+
</includes>
336+
</resource>
337+
</resources>
298338
</build>
299339
</project>

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,11 @@ object SparkEnv extends Logging {
156156
conf.set("spark.driver.port", boundPort.toString)
157157
}
158158

159-
val classLoader = Thread.currentThread.getContextClassLoader
160-
161159
// Create an instance of the class named by the given Java system property, or by
162160
// defaultClassName if the property is not set, and return it as a T
163161
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
164162
val name = conf.get(propertyName, defaultClassName)
165-
val cls = Class.forName(name, true, classLoader)
163+
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
166164
// First try with the constructor that takes SparkConf. If we can't find one,
167165
// use a no-arg constructor instead.
168166
try {

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag](
5454
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
5555
val startTime = System.currentTimeMillis
5656
val env = SparkEnv.get
57-
val worker = env.createPythonWorker(pythonExec, envVars.toMap)
57+
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
58+
59+
// Ensure worker socket is closed on task completion. Closing sockets is idempotent.
60+
context.addOnCompleteCallback(() =>
61+
try {
62+
worker.close()
63+
} catch {
64+
case e: Exception => logWarning("Failed to close worker socket", e)
65+
}
66+
)
5867

5968
@volatile var readerException: Exception = null
6069

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
7878
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
7979

8080
// Create and start the worker
81-
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
82-
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
81+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
8382
val workerEnv = pb.environment()
8483
workerEnv.putAll(envVars)
85-
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
86-
workerEnv.put("PYTHONPATH", pythonPath)
8784
val worker = pb.start()
8885

8986
// Redirect the worker's stderr to ours
@@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
154151

155152
try {
156153
// Create and start the daemon
157-
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
158-
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
154+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
159155
val workerEnv = pb.environment()
160156
workerEnv.putAll(envVars)
161-
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
162-
workerEnv.put("PYTHONPATH", pythonPath)
163157
daemon = pb.start()
164158

165159
// Redirect the stderr to ours

0 commit comments

Comments
 (0)