Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into format_pom
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 24, 2014
2 parents 62d0862 + 640f9a0 commit 77fad08
Show file tree
Hide file tree
Showing 110 changed files with 2,448 additions and 1,183 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

<properties>
<spark.jar.dir>scala-${scala.binary.version}</spark.jar.dir>
<spark.jar.basename>${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</spark.jar.basename>
<spark.jar.basename>spark-assembly-${project.version}-hadoop${hadoop.version}.jar</spark.jar.basename>
<spark.jar>${project.build.directory}/${spark.jar.dir}/${spark.jar.basename}</spark.jar>
<deb.pkg.name>spark</deb.pkg.name>
<deb.install.path>/usr/share/spark</deb.install.path>
Expand Down
4 changes: 2 additions & 2 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
else
# Else use spark-assembly jar from either RELEASE or assembly directory
if [ -f "$FWDIR/RELEASE" ]; then
ASSEMBLY_JAR=`ls "$FWDIR"/jars/spark*-assembly*.jar`
ASSEMBLY_JAR=`ls "$FWDIR"/lib/spark-assembly*hadoop*.jar`
else
ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark*-assembly*hadoop*.jar`
ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*.jar`
fi
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
fi
Expand Down
11 changes: 7 additions & 4 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ fi
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local.
EXAMPLES_DIR="$FWDIR"/examples
SPARK_EXAMPLES_JAR=""
if [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar ]; then
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar`

if [ -f "$FWDIR/RELEASE" ]; then
export SPARK_EXAMPLES_JAR=`ls "$FWDIR"/lib/spark-examples-*hadoop*.jar`
elif [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar ]; then
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar`
fi

if [[ -z $SPARK_EXAMPLES_JAR ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
exit 1
fi
Expand Down
2 changes: 2 additions & 0 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
Expand All @@ -17,6 +18,7 @@
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>pyrolite</artifactId>
<version>2.0</version>
<version>2.0.1</version>
</dependency>
</dependencies>
<build>
Expand Down
67 changes: 41 additions & 26 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val ui = new SparkUI(this)
ui.bind()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
val env = SparkEnv.get
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf)
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down Expand Up @@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging {
postEnvironmentUpdate()
postApplicationStart()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
val env = SparkEnv.get
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}

private[spark] var checkpointDir: Option[String] = None

// Thread Local variable that can be used by users to pass information down the stack
Expand Down Expand Up @@ -381,16 +381,27 @@ class SparkContext(config: SparkConf) extends Logging {
* // In a separate thread:
* sc.cancelJobGroup("some_job_to_cancel")
* }}}
*
* If interruptOnCancel is set to true for the job group, then job cancellation will result
* in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
* that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
* where HDFS may respond to Thread.interrupt() by marking nodes as dead.
*/
def setJobGroup(groupId: String, description: String) {
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
// Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
// changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
// APIs to also take advantage of this property (e.g., internal job failures or canceling from
// JobProgressTab UI) on a per-job basis.
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
}

/** Clear the current thread's job group ID and its description. */
def clearJobGroup() {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}

// Post init
Expand Down Expand Up @@ -1244,6 +1255,8 @@ object SparkContext extends Logging {

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"

private[spark] val SPARK_UNKNOWN_USER = "<unknown>"

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
Expand All @@ -1268,8 +1281,10 @@ object SparkContext extends Logging {

// TODO: Add AccumulatorParams for other types, e.g. lists and strings

implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
new PairRDDFunctions(rdd)
}

implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TaskContext(
val attemptId: Long,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty
) extends Serializable {

@deprecated("use partitionId", "0.8.1")
Expand Down
33 changes: 21 additions & 12 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.util.Try

import net.razorvine.pickle.{Pickler, Unpickler}

Expand Down Expand Up @@ -89,16 +90,22 @@ private[spark] class PythonRDD[T: ClassTag](
dataOut.flush()
worker.shutdownOutput()
} catch {

case e: java.io.FileNotFoundException =>
readerException = e
// Kill the Python worker process:
worker.shutdownOutput()
Try(worker.shutdownOutput()) // kill Python worker process

case e: IOException =>
// This can happen for legitimate reasons if the Python code stops returning data
// before we are done passing elements through, e.g., for take(). Just log a message
// to say it happened.
logInfo("stdin writer to Python finished early")
logDebug("stdin writer to Python finished early", e)
// before we are done passing elements through, e.g., for take(). Just log a message to
// say it happened (as it could also be hiding a real IOException from a data source).
logInfo("stdin writer to Python finished early (may not be an error)", e)

case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see Executor).
readerException = e
Try(worker.shutdownOutput()) // kill Python worker process
}
}
}.start()
Expand Down Expand Up @@ -152,7 +159,7 @@ private[spark] class PythonRDD[T: ClassTag](
val exLength = stream.readInt()
val obj = new Array[Byte](exLength)
stream.readFully(obj)
throw new PythonException(new String(obj))
throw new PythonException(new String(obj), readerException)
case SpecialLengths.END_OF_DATA_SECTION =>
// We've finished the data section of the output, but we can still
// read some accumulator updates:
Expand All @@ -162,15 +169,17 @@ private[spark] class PythonRDD[T: ClassTag](
val update = new Array[Byte](updateLen)
stream.readFully(update)
accumulator += Collections.singletonList(update)

}
Array.empty[Byte]
}
} catch {
case eof: EOFException => {
case e: Exception if readerException != null =>
logError("Python worker exited unexpectedly (crashed)", e)
logError("Python crash may have been caused by prior exception:", readerException)
throw readerException

case eof: EOFException =>
throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
}
case e: Throwable => throw e
}
}

Expand All @@ -185,7 +194,7 @@ private[spark] class PythonRDD[T: ClassTag](
}

/** Thrown for exceptions in user Python code. */
private class PythonException(msg: String) extends Exception(msg)
private class PythonException(msg: String, cause: Exception) extends RuntimeException(msg, cause)

/**
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ object SparkSubmit {
childArgs ++= appArgs.childArgs
} else if (clusterManager == YARN) {
for (arg <- appArgs.childArgs) {
childArgs += ("--args", arg)
childArgs += ("--arg", arg)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
if (args.length == 0) printUsageAndExit(-1)
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")

if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
val testing = sys.env.contains("SPARK_TESTING")
if (!hasHadoopEnv && !testing) {
throw new Exception(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}
}

override def toString = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class HistoryServer(
def initialize() {
attachPage(new HistoryPage(this))
attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
}

/** Bind to the HTTP server behind this web interface. */
override def bind() {
super.bind()
logCheckingThread.start()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}

case KillTask(taskId, _) =>
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
System.exit(1)
} else {
executor.killTask(taskId)
executor.killTask(taskId, interruptThread)
}

case x: DisassociatedEvent =>
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ private[spark] class Executor(
threadPool.execute(tr)
}

def killTask(taskId: Long) {
def killTask(taskId: Long, interruptThread: Boolean) {
val tr = runningTasks.get(taskId)
if (tr != null) {
tr.kill()
tr.kill(interruptThread)
}
}

Expand All @@ -166,11 +166,11 @@ private[spark] class Executor(
@volatile private var killed = false
@volatile private var task: Task[Any] = _

def kill() {
def kill(interruptThread: Boolean) {
logInfo("Executor is trying to kill task " + taskId)
killed = true
if (task != null) {
task.kill()
task.kill(interruptThread)
}
}

Expand Down Expand Up @@ -257,7 +257,7 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}

case TaskKilledException => {
case TaskKilledException | _: InterruptedException if task.killed => {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ private[spark] class MesosExecutorBackend
if (executor == null) {
logError("Received KillTask but executor was null")
} else {
executor.killTask(t.getValue.toLong)
// TODO: Determine the 'interruptOnCancel' property set for the given job.
executor.killTask(t.getValue.toLong, interruptThread = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TaskMetrics extends Serializable {
}

private[spark] object TaskMetrics {
def empty(): TaskMetrics = new TaskMetrics
def empty: TaskMetrics = new TaskMetrics
}


Expand Down
Loading

0 comments on commit 77fad08

Please sign in to comment.