Skip to content

Commit 4cc918a

Browse files
committed
Merge pull request #7 from markhamstra/master-csd
Rebased to AMPLab master
2 parents 928aa6f + 4e5860d commit 4cc918a

File tree

59 files changed

+1389
-940
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1389
-940
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@ streaming-tests.log
3636
dependency-reduced-pom.xml
3737
.ensime
3838
.ensime_lucene
39+
checkpoint
3940
derby.log

bin/compute-classpath.cmd

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
@echo off
2+
3+
rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
4+
rem script and the ExecutorRunner in standalone cluster mode.
5+
6+
set SCALA_VERSION=2.9.3
7+
8+
rem Figure out where the Spark framework is installed
9+
set FWDIR=%~dp0..\
10+
11+
rem Load environment variables from conf\spark-env.cmd, if it exists
12+
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
13+
14+
set CORE_DIR=%FWDIR%core
15+
set REPL_DIR=%FWDIR%repl
16+
set EXAMPLES_DIR=%FWDIR%examples
17+
set BAGEL_DIR=%FWDIR%bagel
18+
set STREAMING_DIR=%FWDIR%streaming
19+
set PYSPARK_DIR=%FWDIR%python
20+
21+
rem Build up classpath
22+
set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes
23+
set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources
24+
set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\classes;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\test-classes
25+
set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\lib\org\apache\kafka\kafka\0.7.2-spark\*
26+
set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes
27+
set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\jars\*
28+
set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\*
29+
set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\*
30+
set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
31+
set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
32+
33+
rem Add hadoop conf dir - else FileSystem.*, etc fail
34+
rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
35+
rem the configurtion files.
36+
if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
37+
set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
38+
:no_hadoop_conf_dir
39+
40+
if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
41+
set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
42+
:no_yarn_conf_dir
43+
44+
rem Add Scala standard library
45+
set CLASSPATH=%CLASSPATH%;%SCALA_HOME%\lib\scala-library.jar;%SCALA_HOME%\lib\scala-compiler.jar;%SCALA_HOME%\lib\jline.jar
46+
47+
rem A bit of a hack to allow calling this script within run2.cmd without seeing output
48+
if "%DONT_PRINT_CLASSPATH%"=="1" goto exit
49+
50+
echo %CLASSPATH%
51+
52+
:exit

bin/compute-classpath.sh

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#!/bin/bash
2+
3+
# This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
4+
# script and the ExecutorRunner in standalone cluster mode.
5+
6+
SCALA_VERSION=2.9.3
7+
8+
# Figure out where Spark is installed
9+
FWDIR="$(cd `dirname $0`/..; pwd)"
10+
11+
# Load environment variables from conf/spark-env.sh, if it exists
12+
if [ -e $FWDIR/conf/spark-env.sh ] ; then
13+
. $FWDIR/conf/spark-env.sh
14+
fi
15+
16+
CORE_DIR="$FWDIR/core"
17+
REPL_DIR="$FWDIR/repl"
18+
REPL_BIN_DIR="$FWDIR/repl-bin"
19+
EXAMPLES_DIR="$FWDIR/examples"
20+
BAGEL_DIR="$FWDIR/bagel"
21+
STREAMING_DIR="$FWDIR/streaming"
22+
PYSPARK_DIR="$FWDIR/python"
23+
24+
# Build up classpath
25+
CLASSPATH="$SPARK_CLASSPATH"
26+
CLASSPATH="$CLASSPATH:$FWDIR/conf"
27+
CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes"
28+
if [ -n "$SPARK_TESTING" ] ; then
29+
CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
30+
CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
31+
fi
32+
CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources"
33+
CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes"
34+
CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
35+
CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
36+
CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
37+
if [ -e "$FWDIR/lib_managed" ]; then
38+
CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*"
39+
CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*"
40+
fi
41+
CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
42+
# Add the shaded JAR for Maven builds
43+
if [ -e $REPL_BIN_DIR/target ]; then
44+
for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
45+
CLASSPATH="$CLASSPATH:$jar"
46+
done
47+
# The shaded JAR doesn't contain examples, so include those separately
48+
EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
49+
CLASSPATH+=":$EXAMPLES_JAR"
50+
fi
51+
CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
52+
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
53+
CLASSPATH="$CLASSPATH:$jar"
54+
done
55+
56+
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
57+
# to avoid the -sources and -doc packages that are built by publish-local.
58+
if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
59+
# Use the JAR from the SBT build
60+
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
61+
fi
62+
if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
63+
# Use the JAR from the Maven build
64+
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
65+
fi
66+
67+
# Add hadoop conf dir - else FileSystem.*, etc fail !
68+
# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
69+
# the configurtion files.
70+
if [ "x" != "x$HADOOP_CONF_DIR" ]; then
71+
CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR"
72+
fi
73+
if [ "x" != "x$YARN_CONF_DIR" ]; then
74+
CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
75+
fi
76+
77+
# Add Scala standard library
78+
if [ -z "$SCALA_LIBRARY_PATH" ]; then
79+
if [ -z "$SCALA_HOME" ]; then
80+
echo "SCALA_HOME is not set" >&2
81+
exit 1
82+
fi
83+
SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
84+
fi
85+
CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar"
86+
CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar"
87+
CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar"
88+
89+
echo "$CLASSPATH"

conf/spark-env.sh.template

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,17 @@
33
# This file contains environment variables required to run Spark. Copy it as
44
# spark-env.sh and edit that to configure Spark for your site. At a minimum,
55
# the following two variables should be set:
6-
# - MESOS_NATIVE_LIBRARY, to point to your Mesos native library (libmesos.so)
7-
# - SCALA_HOME, to point to your Scala installation
6+
# - SCALA_HOME, to point to your Scala installation, or SCALA_LIBRARY_PATH to
7+
# point to the directory for Scala library JARs (if you install Scala as a
8+
# Debian or RPM package, these are in a separate path, often /usr/share/java)
9+
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
810
#
911
# If using the standalone deploy mode, you can also set variables for it:
1012
# - SPARK_MASTER_IP, to bind the master to a different IP address
1113
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
1214
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
1315
# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
1416
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
15-
# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes to be spawned on every slave machine
16-
#
17-
# Finally, Spark also relies on the following variables, but these can be set
18-
# on just the *master* (i.e. in your driver program), and will automatically
19-
# be propagated to workers:
20-
# - SPARK_MEM, to change the amount of memory used per node (this should
21-
# be in the same format as the JVM's -Xmx option, e.g. 300m or 1g)
22-
# - SPARK_CLASSPATH, to add elements to Spark's classpath
23-
# - SPARK_JAVA_OPTS, to add JVM options
24-
# - SPARK_LIBRARY_PATH, to add extra search paths for native libraries.
17+
# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes
18+
# to be spawned on every slave machine
2519

core/src/main/java/spark/network/netty/FileClient.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,20 @@
88
import io.netty.channel.oio.OioEventLoopGroup;
99
import io.netty.channel.socket.oio.OioSocketChannel;
1010

11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1113

1214
class FileClient {
1315

16+
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
1417
private FileClientHandler handler = null;
1518
private Channel channel = null;
1619
private Bootstrap bootstrap = null;
20+
private int connectTimeout = 60*1000; // 1 min
1721

18-
public FileClient(FileClientHandler handler) {
22+
public FileClient(FileClientHandler handler, int connectTimeout) {
1923
this.handler = handler;
24+
this.connectTimeout = connectTimeout;
2025
}
2126

2227
public void init() {
@@ -25,25 +30,10 @@ public void init() {
2530
.channel(OioSocketChannel.class)
2631
.option(ChannelOption.SO_KEEPALIVE, true)
2732
.option(ChannelOption.TCP_NODELAY, true)
33+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
2834
.handler(new FileClientChannelInitializer(handler));
2935
}
3036

31-
public static final class ChannelCloseListener implements ChannelFutureListener {
32-
private FileClient fc = null;
33-
34-
public ChannelCloseListener(FileClient fc){
35-
this.fc = fc;
36-
}
37-
38-
@Override
39-
public void operationComplete(ChannelFuture future) {
40-
if (fc.bootstrap!=null){
41-
fc.bootstrap.shutdown();
42-
fc.bootstrap = null;
43-
}
44-
}
45-
}
46-
4737
public void connect(String host, int port) {
4838
try {
4939
// Start the connection attempt.
@@ -58,8 +48,8 @@ public void connect(String host, int port) {
5848
public void waitForClose() {
5949
try {
6050
channel.closeFuture().sync();
61-
} catch (InterruptedException e){
62-
e.printStackTrace();
51+
} catch (InterruptedException e) {
52+
LOG.warn("FileClient interrupted", e);
6353
}
6454
}
6555

core/src/main/java/spark/network/netty/FileClientHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,14 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
99

1010
private FileHeader currentHeader = null;
1111

12+
private volatile boolean handlerCalled = false;
13+
14+
public boolean isComplete() {
15+
return handlerCalled;
16+
}
17+
1218
public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header);
19+
public abstract void handleError(String blockId);
1320

1421
@Override
1522
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
@@ -26,6 +33,7 @@ public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
2633
// get file
2734
if(in.readableBytes() >= currentHeader.fileLen()) {
2835
handle(ctx, in, currentHeader);
36+
handlerCalled = true;
2937
currentHeader = null;
3038
ctx.close();
3139
}

core/src/main/scala/spark/PairRDDFunctions.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package spark
22

3+
import java.nio.ByteBuffer
34
import java.util.{Date, HashMap => JHashMap}
45
import java.text.SimpleDateFormat
56

@@ -64,8 +65,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
6465
throw new SparkException("Default partitioner cannot partition array keys.")
6566
}
6667
}
67-
val aggregator =
68-
new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
68+
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
6969
if (self.partitioner == Some(partitioner)) {
7070
self.mapPartitions(aggregator.combineValuesByKey(_), true)
7171
} else if (mapSideCombine) {
@@ -97,7 +97,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
9797
* list concatenation, 0 for addition, or 1 for multiplication.).
9898
*/
9999
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
100-
combineByKey[V]({v: V => func(zeroValue, v)}, func, func, partitioner)
100+
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
101+
val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
102+
val zeroArray = new Array[Byte](zeroBuffer.limit)
103+
zeroBuffer.get(zeroArray)
104+
105+
// When deserializing, use a lazy val to create just one instance of the serializer per task
106+
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
107+
def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
108+
109+
combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
101110
}
102111

103112
/**

core/src/main/scala/spark/SparkContext.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,14 @@ class SparkContext(
115115
// Environment variables to pass to our executors
116116
private[spark] val executorEnvs = HashMap[String, String]()
117117
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
118-
for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
119-
"SPARK_TESTING")) {
118+
for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
120119
val value = System.getenv(key)
121120
if (value != null) {
122121
executorEnvs(key) = value
123122
}
124123
}
124+
// Since memory can be set with a system property too, use that
125+
executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
125126
if (environment != null) {
126127
executorEnvs ++= environment
127128
}
@@ -156,14 +157,12 @@ class SparkContext(
156157
scheduler
157158

158159
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
159-
// Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
160+
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
160161
val memoryPerSlaveInt = memoryPerSlave.toInt
161-
val sparkMemEnv = System.getenv("SPARK_MEM")
162-
val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
163-
if (sparkMemEnvInt > memoryPerSlaveInt) {
162+
if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
164163
throw new SparkException(
165-
"Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
166-
memoryPerSlaveInt, sparkMemEnvInt))
164+
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
165+
memoryPerSlaveInt, SparkContext.executorMemoryRequested))
167166
}
168167

169168
val scheduler = new ClusterScheduler(this)
@@ -881,6 +880,15 @@ object SparkContext {
881880

882881
/** Find the JAR that contains the class of a particular object */
883882
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
883+
884+
/** Get the amount of memory per executor requested through system properties or SPARK_MEM */
885+
private[spark] val executorMemoryRequested = {
886+
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
887+
Option(System.getProperty("spark.executor.memory"))
888+
.orElse(Option(System.getenv("SPARK_MEM")))
889+
.map(Utils.memoryStringToMb)
890+
.getOrElse(512)
891+
}
884892
}
885893

886894

core/src/main/scala/spark/SparkEnv.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class SparkEnv (
5959

6060
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
6161
synchronized {
62-
pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorkerFactory(pythonExec, envVars)).create()
62+
val key = (pythonExec, envVars)
63+
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
6364
}
6465
}
6566

0 commit comments

Comments
 (0)