Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into format_pom
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 29, 2014
2 parents bee920d + 9f7a095 commit b452680
Show file tree
Hide file tree
Showing 37 changed files with 1,029 additions and 295 deletions.
2 changes: 1 addition & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
. $FWDIR/bin/load-spark-env.sh

# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"

ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

Expand Down
9 changes: 5 additions & 4 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ case "$1" in
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
;;

# All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
'org.apache.spark.repl.Main')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
# Spark submit uses SPARK_SUBMIT_OPTS and SPARK_JAVA_OPTS
'org.apache.spark.deploy.SparkSubmit')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS \
-Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
;;

*)
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
Expand All @@ -98,7 +100,6 @@ fi

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$_SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
Expand Down
11 changes: 5 additions & 6 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#
# Shell script for starting the Spark Shell REPL

args="$@"
cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
Expand All @@ -46,12 +45,12 @@ function main(){
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-internal "$args" --class org.apache.spark.repl.Main
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
stty icanon echo > /dev/null 2>&1
else
export SPARK_REPL_OPTS
$FWDIR/bin/spark-submit spark-internal "$args" --class org.apache.spark.repl.Main
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
fi
}

Expand Down Expand Up @@ -83,7 +82,7 @@ if [[ ! $? ]]; then
saved_stty=""
fi

main
main "$@"

# record the exit status lest it be overwritten:
# then reenable echo and propagate the code.
Expand Down
6 changes: 3 additions & 3 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ while (($#)); do
elif [ "$1" = "--driver-memory" ]; then
DRIVER_MEMORY=$2
elif [ "$1" = "--driver-library-path" ]; then
export _SPARK_LIBRARY_PATH=$2
export SPARK_SUBMIT_LIBRARY_PATH=$2
elif [ "$1" = "--driver-class-path" ]; then
export SPARK_CLASSPATH="$SPARK_CLASSPATH:$2"
export SPARK_SUBMIT_CLASSPATH=$2
elif [ "$1" = "--driver-java-options" ]; then
export SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $2"
export SPARK_SUBMIT_OPTS=$2
fi
shift
done
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.net.{URI, URL}
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils

/**
* Scala code behind the spark-submit script. The script handles setting up the classpath with
Expand Down Expand Up @@ -128,6 +129,18 @@ object SparkSubmit {
childArgs += ("--class", appArgs.mainClass)
}

if (clusterManager == YARN) {
// The choice of class is arbitrary, could use any spark-yarn class
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " +
"with YARN support."
throw new Exception(msg)
}
}

// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

val options = List[OptionAssigner](
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
Expand Down Expand Up @@ -185,7 +198,6 @@ object SparkSubmit {
if (clusterManager == STANDALONE) {
val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
println("SPARK JARS" + sysProps.get("spark.jars"))
}

if (deployOnCluster && clusterManager == STANDALONE) {
Expand Down
230 changes: 117 additions & 113 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import java.io.{File, FileInputStream, IOException}
import java.util.Properties

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ArrayBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkException
import org.apache.spark.util.Utils

/**
* Parses and encapsulates arguments from the spark-submit script.
*/
private[spark] class SparkSubmitArguments(args: Array[String]) {
private[spark] class SparkSubmitArguments(args: Seq[String]) {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
Expand Down Expand Up @@ -118,8 +119,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {

if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
val testing = sys.env.contains("SPARK_TESTING")
if (!hasHadoopEnv && !testing) {
if (!hasHadoopEnv && !Utils.isTesting) {
throw new Exception(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
Expand Down Expand Up @@ -156,119 +156,121 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
""".stripMargin
}

private def parseOpts(opts: List[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
parseOpts(tail)
/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
// Delineates parsing of Spark options from parsing of user options.
var inSparkOpts = true
parse(opts)

case ("--master") :: value :: tail =>
master = value
parseOpts(tail)
def parse(opts: Seq[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
parse(tail)

case ("--class") :: value :: tail =>
mainClass = value
parseOpts(tail)
case ("--master") :: value :: tail =>
master = value
parse(tail)

case ("--deploy-mode") :: value :: tail =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
parseOpts(tail)

case ("--num-executors") :: value :: tail =>
numExecutors = value
parseOpts(tail)

case ("--total-executor-cores") :: value :: tail =>
totalExecutorCores = value
parseOpts(tail)

case ("--executor-cores") :: value :: tail =>
executorCores = value
parseOpts(tail)

case ("--executor-memory") :: value :: tail =>
executorMemory = value
parseOpts(tail)

case ("--driver-memory") :: value :: tail =>
driverMemory = value
parseOpts(tail)

case ("--driver-cores") :: value :: tail =>
driverCores = value
parseOpts(tail)

case ("--driver-class-path") :: value :: tail =>
driverExtraClassPath = value
parseOpts(tail)

case ("--driver-java-options") :: value :: tail =>
driverExtraJavaOptions = value
parseOpts(tail)

case ("--driver-library-path") :: value :: tail =>
driverExtraLibraryPath = value
parseOpts(tail)

case ("--properties-file") :: value :: tail =>
propertiesFile = value
parseOpts(tail)

case ("--supervise") :: tail =>
supervise = true
parseOpts(tail)

case ("--queue") :: value :: tail =>
queue = value
parseOpts(tail)

case ("--files") :: value :: tail =>
files = value
parseOpts(tail)

case ("--archives") :: value :: tail =>
archives = value
parseOpts(tail)

case ("--arg") :: value :: tail =>
childArgs += value
parseOpts(tail)

case ("--jars") :: value :: tail =>
jars = value
parseOpts(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case ("--verbose" | "-v") :: tail =>
verbose = true
parseOpts(tail)

case value :: tail =>
if (value.startsWith("-")) {
val errMessage = s"Unrecognized option '$value'."
val suggestion: Option[String] = value match {
case v if v.startsWith("--") && v.contains("=") =>
val parts = v.split("=")
Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?")
case _ =>
None
case ("--class") :: value :: tail =>
mainClass = value
parse(tail)

case ("--deploy-mode") :: value :: tail =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
parse(tail)

case ("--num-executors") :: value :: tail =>
numExecutors = value
parse(tail)

case ("--total-executor-cores") :: value :: tail =>
totalExecutorCores = value
parse(tail)

case ("--executor-cores") :: value :: tail =>
executorCores = value
parse(tail)

case ("--executor-memory") :: value :: tail =>
executorMemory = value
parse(tail)

case ("--driver-memory") :: value :: tail =>
driverMemory = value
parse(tail)

case ("--driver-cores") :: value :: tail =>
driverCores = value
parse(tail)

case ("--driver-class-path") :: value :: tail =>
driverExtraClassPath = value
parse(tail)

case ("--driver-java-options") :: value :: tail =>
driverExtraJavaOptions = value
parse(tail)

case ("--driver-library-path") :: value :: tail =>
driverExtraLibraryPath = value
parse(tail)

case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case ("--supervise") :: tail =>
supervise = true
parse(tail)

case ("--queue") :: value :: tail =>
queue = value
parse(tail)

case ("--files") :: value :: tail =>
files = value
parse(tail)

case ("--archives") :: value :: tail =>
archives = value
parse(tail)

case ("--jars") :: value :: tail =>
jars = value
parse(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case ("--verbose" | "-v") :: tail =>
verbose = true
parse(tail)

case value :: tail =>
if (inSparkOpts) {
value match {
// convert --foo=bar to --foo bar
case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
val parts = v.split("=")
parse(Seq(parts(0), parts(1)) ++ tail)
case v if v.startsWith("-") =>
val errMessage = s"Unrecognized option '$value'."
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource = v
inSparkOpts = false
parse(tail)
}
} else {
childArgs += value
parse(tail)
}
SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(""))
}

if (primaryResource != null) {
val error = s"Found two conflicting resources, $value and $primaryResource." +
" Expecting only one resource."
SparkSubmit.printErrorAndExit(error)
case Nil =>
}
primaryResource = value
parseOpts(tail)

case Nil =>
}

private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
Expand All @@ -277,7 +279,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
outStream.println("Unknown/unsupported param " + unknownParam)
}
outStream.println(
"""Usage: spark-submit <app jar> [options]
"""Usage: spark-submit [options] <app jar> [app options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
| --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
Expand All @@ -296,7 +298,9 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
| --driver-java-options Extra Java options to pass to the driver
| --driver-library-path Extra library path entries to pass to the driver
| --driver-class-path Extra class path entries to pass to the driver
| --driver-class-path Extra class path entries to pass to the driver. Note that
| jars added with --jars are automatically included in the
| classpath.
|
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
Expand Down
Loading

0 comments on commit b452680

Please sign in to comment.