Skip to content

Commit cd8f59d

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into scalatest
2 parents 046540d + bd67551 commit cd8f59d

File tree

14 files changed

+62
-87
lines changed

14 files changed

+62
-87
lines changed

README.md

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,22 @@ And run the following command, which should also return 1000:
3939
## Example Programs
4040

4141
Spark also comes with several sample programs in the `examples` directory.
42-
To run one of them, use `./bin/run-example <class> <params>`. For example:
42+
To run one of them, use `./bin/run-example <class> [params]`. For example:
4343

44-
./bin/run-example org.apache.spark.examples.SparkLR local[2]
44+
./bin/run-example org.apache.spark.examples.SparkLR
4545

46-
will run the Logistic Regression example locally on 2 CPUs.
46+
will run the Logistic Regression example locally.
4747

48-
Each of the example programs prints usage help if no params are given.
48+
You can set the MASTER environment variable when running examples to submit
49+
examples to a cluster. This can be a mesos:// or spark:// URL,
50+
"yarn-cluster" or "yarn-client" to run on YARN, and "local" to run
51+
locally with one thread, or "local[N]" to run locally with N threads. You
52+
can also use an abbreviated class name if the class is in the `examples`
53+
package. For instance:
4954

50-
All of the Spark samples take a `<master>` parameter that is the cluster URL
51-
to connect to. This can be a mesos:// or spark:// URL, or "local" to run
52-
locally with one thread, or "local[N]" to run locally with N threads.
55+
MASTER=spark://host:7077 ./bin/run-example SparkPi
56+
57+
Many of the example programs print usage help if no params are given.
5358

5459
## Running Tests
5560

bin/pyspark

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
3131
ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null
3232
if [[ $? != 0 ]]; then
3333
echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2
34-
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
34+
echo "You need to build Spark before running this program" >&2
3535
exit 1
3636
fi
3737
fi

bin/run-example

Lines changed: 18 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,10 @@
1717
# limitations under the License.
1818
#
1919

20-
cygwin=false
21-
case "`uname`" in
22-
CYGWIN*) cygwin=true;;
23-
esac
24-
2520
SCALA_VERSION=2.10
2621

27-
# Figure out where the Scala framework is installed
2822
FWDIR="$(cd `dirname $0`/..; pwd)"
29-
30-
# Export this as SPARK_HOME
3123
export SPARK_HOME="$FWDIR"
32-
33-
. $FWDIR/bin/load-spark-env.sh
34-
35-
if [ -z "$1" ]; then
36-
echo "Usage: run-example <example-class> [<args>]" >&2
37-
exit 1
38-
fi
39-
40-
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
41-
# to avoid the -sources and -doc packages that are built by publish-local.
4224
EXAMPLES_DIR="$FWDIR"/examples
4325

4426
if [ -f "$FWDIR/RELEASE" ]; then
@@ -49,46 +31,29 @@ fi
4931

5032
if [[ -z $SPARK_EXAMPLES_JAR ]]; then
5133
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2
52-
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
34+
echo "You need to build Spark before running this program" >&2
5335
exit 1
5436
fi
5537

38+
EXAMPLE_MASTER=${MASTER:-"local[*]"}
5639

57-
# Since the examples JAR ideally shouldn't include spark-core (that dependency should be
58-
# "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
59-
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
60-
CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
61-
62-
if $cygwin; then
63-
CLASSPATH=`cygpath -wp $CLASSPATH`
64-
export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
65-
fi
66-
67-
# Find java binary
68-
if [ -n "${JAVA_HOME}" ]; then
69-
RUNNER="${JAVA_HOME}/bin/java"
70-
else
71-
if [ `command -v java` ]; then
72-
RUNNER="java"
73-
else
74-
echo "JAVA_HOME is not set" >&2
75-
exit 1
76-
fi
77-
fi
78-
79-
# Set JAVA_OPTS to be able to load native libraries and to set heap size
80-
JAVA_OPTS="$SPARK_JAVA_OPTS"
81-
# Load extra JAVA_OPTS from conf/java-opts, if it exists
82-
if [ -e "$FWDIR/conf/java-opts" ] ; then
83-
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
40+
if [ -n "$1" ]; then
41+
EXAMPLE_CLASS="$1"
42+
shift
43+
else
44+
echo "usage: ./bin/run-example <example-class> [example-args]"
45+
echo " - set MASTER=XX to use a specific master"
46+
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.MovieLensALS)"
47+
echo
48+
exit -1
8449
fi
85-
export JAVA_OPTS
8650

87-
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
88-
echo -n "Spark Command: "
89-
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
90-
echo "========================================"
91-
echo
51+
if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
52+
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
9253
fi
9354

94-
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
55+
./bin/spark-submit \
56+
--master $EXAMPLE_MASTER \
57+
--class $EXAMPLE_CLASS \
58+
$SPARK_EXAMPLES_JAR \
59+
"$@"

bin/spark-class

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
114114
jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar")
115115
if [ "$num_jars" -eq "0" ]; then
116116
echo "Failed to find Spark assembly in $FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2
117-
echo "You need to build Spark with 'sbt/sbt assembly' before running this program." >&2
117+
echo "You need to build Spark before running this program." >&2
118118
exit 1
119119
fi
120120
if [ "$num_jars" -gt "1" ]; then

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,15 @@ object SparkSubmit {
160160
// each deploy mode; we iterate through these below
161161
val options = List[OptionAssigner](
162162
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
163+
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
163164
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
164165
sysProp = "spark.driver.extraClassPath"),
165166
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
166167
sysProp = "spark.driver.extraJavaOptions"),
167168
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
168169
sysProp = "spark.driver.extraLibraryPath"),
169170
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
170-
OptionAssigner(args.name, YARN, true, clOption = "--name"),
171+
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
171172
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
172173
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
173174
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
@@ -188,8 +189,7 @@ object SparkSubmit {
188189
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
189190
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
190191
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
191-
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
192-
OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name")
192+
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars")
193193
)
194194

195195
// For client mode make any added jars immediately visible on the classpath
@@ -205,7 +205,8 @@ object SparkSubmit {
205205
(clusterManager & opt.clusterManager) != 0) {
206206
if (opt.clOption != null) {
207207
childArgs += (opt.clOption, opt.value)
208-
} else if (opt.sysProp != null) {
208+
}
209+
if (opt.sysProp != null) {
209210
sysProps.put(opt.sysProp, opt.value)
210211
}
211212
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,9 +330,9 @@ abstract class RDD[T: ClassTag](
330330
if (shuffle) {
331331
// include a shuffle step so that our upstream tasks are still distributed
332332
new CoalescedRDD(
333-
new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
333+
new ShuffledRDD[Int, T, (Int, T)](map(x => (Utils.random.nextInt(), x)),
334334
new HashPartitioner(numPartitions)),
335-
numPartitions).keys
335+
numPartitions).values
336336
} else {
337337
new CoalescedRDD(this, numPartitions)
338338
}

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,12 @@ private[spark] object ShuffleMapTask {
5757
}
5858

5959
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
60-
synchronized {
61-
val loader = Thread.currentThread.getContextClassLoader
62-
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
63-
val ser = SparkEnv.get.closureSerializer.newInstance()
64-
val objIn = ser.deserializeStream(in)
65-
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
66-
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
67-
(rdd, dep)
68-
}
60+
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
61+
val ser = SparkEnv.get.closureSerializer.newInstance()
62+
val objIn = ser.deserializeStream(in)
63+
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
64+
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
65+
(rdd, dep)
6966
}
7067

7168
// Since both the JarSet and FileSet have the same format this is used for both.

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
250250
// Remove the block from the slave's BlockManager.
251251
// Doesn't actually wait for a confirmation and the message might get lost.
252252
// If message loss becomes frequent, we should add retry logic here.
253-
blockManager.get.slaveActor ! RemoveBlock(blockId)
253+
blockManager.get.slaveActor.ask(RemoveBlock(blockId))(akkaTimeout)
254254
}
255255
}
256256
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
104104
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
105105
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
106106
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
107-
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6",
107+
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "beauty",
108108
"thejar.jar", "arg1", "arg2")
109109
val appArgs = new SparkSubmitArguments(clArgs)
110110
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
@@ -122,16 +122,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
122122
childArgsStr should include ("--num-executors 6")
123123
mainClass should be ("org.apache.spark.deploy.yarn.Client")
124124
classpath should have length (0)
125-
sysProps should have size (1)
125+
sysProps("spark.app.name") should be ("beauty")
126+
sysProps("SPARK_SUBMIT") should be ("true")
126127
}
127128

128129
test("handles YARN client mode") {
129130
val clArgs = Seq("--deploy-mode", "client",
130131
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
131132
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
132133
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
133-
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "thejar.jar",
134-
"arg1", "arg2")
134+
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "trill",
135+
"thejar.jar", "arg1", "arg2")
135136
val appArgs = new SparkSubmitArguments(clArgs)
136137
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
137138
childArgs.mkString(" ") should be ("arg1 arg2")
@@ -140,6 +141,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
140141
classpath should contain ("one.jar")
141142
classpath should contain ("two.jar")
142143
classpath should contain ("three.jar")
144+
sysProps("spark.app.name") should be ("trill")
143145
sysProps("spark.executor.memory") should be ("5g")
144146
sysProps("spark.executor.cores") should be ("5")
145147
sysProps("spark.yarn.queue") should be ("thequeue")

docs/building-with-maven.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.o
9696

9797
The ScalaTest plugin also supports running only a specific test suite as follows:
9898

99-
$ mvn -Dhadoop.version=... -Dsuites=org.apache.spark.repl.ReplSuite test
99+
$ mvn -Dhadoop.version=... -DwildcardSuites=org.apache.spark.repl.ReplSuite test
100100

101101

102102
## Continuous Compilation ##

docs/running-on-yarn.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ For example:
5353
--driver-memory 4g \
5454
--executor-memory 2g \
5555
--executor-cores 1
56-
examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
56+
lib/spark-examples*.jar \
5757
yarn-cluster 5
5858

5959
The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the "Viewing Logs" section below for how to see driver and executor logs.

make-distribution.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
#
4141

4242
set -o pipefail
43+
set -e
44+
4345
# Figure out where the Spark framework is installed
4446
FWDIR="$(cd `dirname $0`; pwd)"
4547
DISTDIR="$FWDIR/dist"

sbin/spark-executor

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,8 @@
1919

2020
FWDIR="$(cd `dirname $0`/..; pwd)"
2121

22+
export PYTHONPATH=$FWDIR/python:$PYTHONPATH
23+
export PYTHONPATH=$FWDIR/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
24+
2225
echo "Running spark-executor with framework dir = $FWDIR"
2326
exec $FWDIR/bin/spark-class org.apache.spark.executor.MesosExecutorBackend

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ private[spark] class YarnClientSchedulerBackend(
3535

3636
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
3737
arrayBuf: ArrayBuffer[String]) {
38-
if (System.getProperty(sysProp) != null) {
39-
arrayBuf += (optionName, System.getProperty(sysProp))
40-
} else if (System.getenv(envVar) != null) {
38+
if (System.getenv(envVar) != null) {
4139
arrayBuf += (optionName, System.getenv(envVar))
40+
} else if (sc.getConf.contains(sysProp)) {
41+
arrayBuf += (optionName, sc.getConf.get(sysProp))
4242
}
4343
}
4444

0 commit comments

Comments
 (0)