Skip to content

Commit e2f4ff9

Browse files
committed
Merge github.com:apache/spark
Conflicts: core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
2 parents 050419e + f1fa617 commit e2f4ff9

File tree

266 files changed

+7368
-1626
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

266 files changed

+7368
-1626
lines changed

bin/load-spark-env.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ if [ -z "$SPARK_ENV_LOADED" ]; then
3030
use_conf_dir=${SPARK_CONF_DIR:-"$parent_dir/conf"}
3131

3232
if [ -f "${use_conf_dir}/spark-env.sh" ]; then
33+
# Promote all variable declarations to environment (exported) variables
34+
set -a
3335
. "${use_conf_dir}/spark-env.sh"
36+
set +a
3437
fi
3538
fi

bin/pyspark

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ if [ -n "$IPYTHON_OPTS" ]; then
5555
IPYTHON=1
5656
fi
5757

58-
if [[ "$IPYTHON" = "1" ]] ; then
58+
# Only use ipython if no command line arguments were provided [SPARK-1134]
59+
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
5960
exec ipython $IPYTHON_OPTS
6061
else
6162
exec "$PYSPARK_PYTHON" "$@"

bin/spark-shell

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ function set_spark_log_conf(){
127127

128128
function set_spark_master(){
129129
if ! [[ "$1" =~ $ARG_FLAG_PATTERN ]]; then
130-
MASTER="$1"
130+
export MASTER="$1"
131131
else
132132
out_error "wrong format for $2"
133133
fi
@@ -145,7 +145,7 @@ function resolve_spark_master(){
145145
fi
146146

147147
if [ -z "$MASTER" ]; then
148-
MASTER="$DEFAULT_MASTER"
148+
export MASTER="$DEFAULT_MASTER"
149149
fi
150150

151151
}

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
151151
<version>3.2.6</version>
152152
<!-- see also exclusion for lift-json; this is necessary since it depends on
153-
scala-library and scalap 2.10.0, but we use 2.10.3, and only override
153+
scala-library and scalap 2.10.0, but we use 2.10.4, and only override
154154
scala-library -->
155155
<exclusions>
156156
<exclusion>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.apache.mesos.MesosNativeLibrary
3737

3838
import org.apache.spark.broadcast.Broadcast
3939
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
40+
import org.apache.spark.input.WholeTextFileInputFormat
4041
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4142
import org.apache.spark.rdd._
4243
import org.apache.spark.scheduler._
@@ -369,6 +370,39 @@ class SparkContext(
369370
minSplits).map(pair => pair._2.toString)
370371
}
371372

373+
/**
374+
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
375+
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
376+
* key-value pair, where the key is the path of each file, the value is the content of each file.
377+
*
378+
* <p> For example, if you have the following files:
379+
* {{{
380+
* hdfs://a-hdfs-path/part-00000
381+
* hdfs://a-hdfs-path/part-00001
382+
* ...
383+
* hdfs://a-hdfs-path/part-nnnnn
384+
* }}}
385+
*
386+
* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
387+
*
388+
* <p> then `rdd` contains
389+
* {{{
390+
* (a-hdfs-path/part-00000, its content)
391+
* (a-hdfs-path/part-00001, its content)
392+
* ...
393+
* (a-hdfs-path/part-nnnnn, its content)
394+
* }}}
395+
*
396+
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
397+
*/
398+
def wholeTextFiles(path: String): RDD[(String, String)] = {
399+
newAPIHadoopFile(
400+
path,
401+
classOf[WholeTextFileInputFormat],
402+
classOf[String],
403+
classOf[String])
404+
}
405+
372406
/**
373407
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
374408
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,34 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
154154
*/
155155
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
156156

157+
/**
158+
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
159+
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
160+
* key-value pair, where the key is the path of each file, the value is the content of each file.
161+
*
162+
* <p> For example, if you have the following files:
163+
* {{{
164+
* hdfs://a-hdfs-path/part-00000
165+
* hdfs://a-hdfs-path/part-00001
166+
* ...
167+
* hdfs://a-hdfs-path/part-nnnnn
168+
* }}}
169+
*
170+
* Do `JavaPairRDD<String, String> rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")`,
171+
*
172+
* <p> then `rdd` contains
173+
* {{{
174+
* (a-hdfs-path/part-00000, its content)
175+
* (a-hdfs-path/part-00001, its content)
176+
* ...
177+
* (a-hdfs-path/part-nnnnn, its content)
178+
* }}}
179+
*
180+
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
181+
*/
182+
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
183+
new JavaPairRDD(sc.wholeTextFiles(path))
184+
157185
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
158186
*
159187
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import org.apache.hadoop.fs.Path
21+
import org.apache.hadoop.mapreduce.InputSplit
22+
import org.apache.hadoop.mapreduce.JobContext
23+
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
24+
import org.apache.hadoop.mapreduce.RecordReader
25+
import org.apache.hadoop.mapreduce.TaskAttemptContext
26+
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
27+
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
28+
29+
/**
30+
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
31+
* reading whole text files. Each file is read as key-value pair, where the key is the file path and
32+
* the value is the entire content of file.
33+
*/
34+
35+
private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
36+
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
37+
38+
override def createRecordReader(
39+
split: InputSplit,
40+
context: TaskAttemptContext): RecordReader[String, String] = {
41+
42+
new CombineFileRecordReader[String, String](
43+
split.asInstanceOf[CombineFileSplit],
44+
context,
45+
classOf[WholeTextFileRecordReader])
46+
}
47+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import com.google.common.io.{ByteStreams, Closeables}
21+
22+
import org.apache.hadoop.io.Text
23+
import org.apache.hadoop.mapreduce.InputSplit
24+
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
25+
import org.apache.hadoop.mapreduce.RecordReader
26+
import org.apache.hadoop.mapreduce.TaskAttemptContext
27+
28+
/**
29+
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
30+
* out in a key-value pair, where the key is the file path and the value is the entire content of
31+
* the file.
32+
*/
33+
private[spark] class WholeTextFileRecordReader(
34+
split: CombineFileSplit,
35+
context: TaskAttemptContext,
36+
index: Integer)
37+
extends RecordReader[String, String] {
38+
39+
private val path = split.getPath(index)
40+
private val fs = path.getFileSystem(context.getConfiguration)
41+
42+
// True means the current file has been processed, then skip it.
43+
private var processed = false
44+
45+
private val key = path.toString
46+
private var value: String = null
47+
48+
override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
49+
50+
override def close() = {}
51+
52+
override def getProgress = if (processed) 1.0f else 0.0f
53+
54+
override def getCurrentKey = key
55+
56+
override def getCurrentValue = value
57+
58+
override def nextKeyValue = {
59+
if (!processed) {
60+
val fileIn = fs.open(path)
61+
val innerBuffer = ByteStreams.toByteArray(fileIn)
62+
63+
value = new Text(innerBuffer).toString
64+
Closeables.close(fileIn, false)
65+
66+
processed = true
67+
true
68+
} else {
69+
false
70+
}
71+
}
72+
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class DAGScheduler(
8484
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
8585
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
8686
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
87-
private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
87+
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
8888
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
8989
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
9090

@@ -536,7 +536,7 @@ class DAGScheduler(
536536
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
537537
runLocally(job)
538538
} else {
539-
stageIdToActiveJob(jobId) = job
539+
jobIdToActiveJob(jobId) = job
540540
activeJobs += job
541541
resultStageToJob(finalStage) = job
542542
listenerBus.post(
@@ -559,7 +559,7 @@ class DAGScheduler(
559559
// Cancel all running jobs.
560560
runningStages.map(_.jobId).foreach(handleJobCancellation)
561561
activeJobs.clear() // These should already be empty by this point,
562-
stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
562+
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
563563

564564
case ExecutorAdded(execId, host) =>
565565
handleExecutorAdded(execId, host)
@@ -569,7 +569,6 @@ class DAGScheduler(
569569

570570
case BeginEvent(task, taskInfo) =>
571571
for (
572-
job <- stageIdToActiveJob.get(task.stageId);
573572
stage <- stageIdToStage.get(task.stageId);
574573
stageInfo <- stageToInfos.get(stage)
575574
) {
@@ -697,7 +696,7 @@ class DAGScheduler(
697696
private def activeJobForStage(stage: Stage): Option[Int] = {
698697
if (stageIdToJobIds.contains(stage.id)) {
699698
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
700-
jobsThatUseStage.find(stageIdToActiveJob.contains)
699+
jobsThatUseStage.find(jobIdToActiveJob.contains)
701700
} else {
702701
None
703702
}
@@ -750,8 +749,8 @@ class DAGScheduler(
750749
}
751750
}
752751

753-
val properties = if (stageIdToActiveJob.contains(jobId)) {
754-
stageIdToActiveJob(stage.jobId).properties
752+
val properties = if (jobIdToActiveJob.contains(jobId)) {
753+
jobIdToActiveJob(stage.jobId).properties
755754
} else {
756755
// this stage will be assigned to "default" pool
757756
null
@@ -827,7 +826,7 @@ class DAGScheduler(
827826
job.numFinished += 1
828827
// If the whole job has finished, remove it
829828
if (job.numFinished == job.numPartitions) {
830-
stageIdToActiveJob -= stage.jobId
829+
jobIdToActiveJob -= stage.jobId
831830
activeJobs -= job
832831
resultStageToJob -= stage
833832
markStageAsFinished(stage)
@@ -986,11 +985,11 @@ class DAGScheduler(
986985
val independentStages = removeJobAndIndependentStages(jobId)
987986
independentStages.foreach(taskScheduler.cancelTasks)
988987
val error = new SparkException("Job %d cancelled".format(jobId))
989-
val job = stageIdToActiveJob(jobId)
988+
val job = jobIdToActiveJob(jobId)
990989
job.listener.jobFailed(error)
991990
jobIdToStageIds -= jobId
992991
activeJobs -= job
993-
stageIdToActiveJob -= jobId
992+
jobIdToActiveJob -= jobId
994993
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
995994
}
996995
}
@@ -1011,7 +1010,7 @@ class DAGScheduler(
10111010
val error = new SparkException("Job aborted: " + reason)
10121011
job.listener.jobFailed(error)
10131012
jobIdToStageIdsRemove(job.jobId)
1014-
stageIdToActiveJob -= resultStage.jobId
1013+
jobIdToActiveJob -= resultStage.jobId
10151014
activeJobs -= job
10161015
resultStageToJob -= resultStage
10171016
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
package org.apache.spark.ui
1919

2020
import java.net.{InetSocketAddress, URL}
21+
import javax.servlet.DispatcherType
2122
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
2223

2324
import scala.annotation.tailrec
2425
import scala.util.{Failure, Success, Try}
2526
import scala.xml.Node
2627

27-
import org.eclipse.jetty.server.{DispatcherType, Server}
28+
import org.eclipse.jetty.server.Server
2829
import org.eclipse.jetty.server.handler._
2930
import org.eclipse.jetty.servlet._
3031
import org.eclipse.jetty.util.thread.QueuedThreadPool

0 commit comments

Comments
 (0)