Skip to content

Commit 47ce99d

Browse files
committed
Merge branch 'master' into openstack
2 parents 99f095d + 237b96b commit 47ce99d

File tree

60 files changed

+599
-307
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+599
-307
lines changed

core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
<dependency>
7171
<groupId>org.apache.commons</groupId>
7272
<artifactId>commons-math3</artifactId>
73+
<version>3.3</version>
7374
<scope>test</scope>
7475
</dependency>
7576
<dependency>

core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
1919

2020
private[spark] object ExecutorState extends Enumeration {
2121

22-
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
22+
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
2323

2424
type ExecutorState = Value
2525

26-
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
26+
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
2727
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,11 @@ private[spark] class Master(
303303
appInfo.removeExecutor(exec)
304304
exec.worker.removeExecutor(exec)
305305

306+
val normalExit = exitStatus.exists(_ == 0)
306307
// Only retry certain number of times so we don't go into an infinite loop.
307-
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308+
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308309
schedule()
309-
} else {
310+
} else if (!normalExit) {
310311
logError("Application %s with ID %s failed %d times, removing it".format(
311312
appInfo.desc.name, appInfo.id, appInfo.retryCount))
312313
removeApplication(appInfo, ApplicationState.FAILED)

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,10 @@ private[spark] class ExecutorRunner(
154154
Files.write(header, stderr, Charsets.UTF_8)
155155
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
156156

157-
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
158-
// long-lived processes only. However, in the future, we might restart the executor a few
159-
// times on the same machine.
157+
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
158+
// or with nonzero exit code
160159
val exitCode = process.waitFor()
161-
state = ExecutorState.FAILED
160+
state = ExecutorState.EXITED
162161
val message = "Command exited with code " + exitCode
163162
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
164163
} catch {

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,13 @@ class HadoopRDD[K, V](
139139
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
140140
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
141141
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
142-
val newJobConf = new JobConf(broadcastedConf.value.value)
143-
initLocalJobConfFuncOpt.map(f => f(newJobConf))
144-
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
145-
newJobConf
142+
// synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456)
143+
broadcastedConf.synchronized {
144+
val newJobConf = new JobConf(broadcastedConf.value.value)
145+
initLocalJobConfFuncOpt.map(f => f(newJobConf))
146+
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
147+
newJobConf
148+
}
146149
}
147150
}
148151

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -787,8 +787,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
787787
val outfmt = job.getOutputFormatClass
788788
val jobFormat = outfmt.newInstance
789789

790-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
791-
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
790+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
792791
// FileOutputFormat ignores the filesystem parameter
793792
jobFormat.checkOutputSpecs(job)
794793
}
@@ -854,8 +853,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
854853
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
855854
valueClass.getSimpleName + ")")
856855

857-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
858-
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
856+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
859857
// FileOutputFormat ignores the filesystem parameter
860858
val ignoredFs = FileSystem.get(conf)
861859
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)

core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ private object ParallelCollectionRDD {
117117
if (numSlices < 1) {
118118
throw new IllegalArgumentException("Positive number of slices required")
119119
}
120+
// Sequences need to be sliced at the same set of index positions for operations
121+
// like RDD.zip() to behave as expected
122+
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
123+
(0 until numSlices).iterator.map(i => {
124+
val start = ((i * length) / numSlices).toInt
125+
val end = (((i + 1) * length) / numSlices).toInt
126+
(start, end)
127+
})
128+
}
120129
seq match {
121130
case r: Range.Inclusive => {
122131
val sign = if (r.step < 0) {
@@ -128,30 +137,28 @@ private object ParallelCollectionRDD {
128137
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
129138
}
130139
case r: Range => {
131-
(0 until numSlices).map(i => {
132-
val start = ((i * r.length.toLong) / numSlices).toInt
133-
val end = (((i + 1) * r.length.toLong) / numSlices).toInt
134-
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
135-
}).asInstanceOf[Seq[Seq[T]]]
140+
positions(r.length, numSlices).map({
141+
case (start, end) =>
142+
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
143+
}).toSeq.asInstanceOf[Seq[Seq[T]]]
136144
}
137145
case nr: NumericRange[_] => {
138146
// For ranges of Long, Double, BigInteger, etc
139147
val slices = new ArrayBuffer[Seq[T]](numSlices)
140-
val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up to catch everything
141148
var r = nr
142-
for (i <- 0 until numSlices) {
149+
for ((start, end) <- positions(nr.length, numSlices)) {
150+
val sliceSize = end - start
143151
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
144152
r = r.drop(sliceSize)
145153
}
146154
slices
147155
}
148156
case _ => {
149157
val array = seq.toArray // To prevent O(n^2) operations for List etc
150-
(0 until numSlices).map(i => {
151-
val start = ((i * array.length.toLong) / numSlices).toInt
152-
val end = (((i + 1) * array.length.toLong) / numSlices).toInt
153-
array.slice(start, end).toSeq
154-
})
158+
positions(array.length, numSlices).map({
159+
case (start, end) =>
160+
array.slice(start, end).toSeq
161+
}).toSeq
155162
}
156163
}
157164
}

core/src/main/scala/org/apache/spark/storage/RDDInfo.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class RDDInfo(
2626
val id: Int,
2727
val name: String,
2828
val numPartitions: Int,
29-
val storageLevel: StorageLevel)
29+
var storageLevel: StorageLevel)
3030
extends Ordered[RDDInfo] {
3131

3232
var numCachedPartitions = 0
@@ -36,8 +36,8 @@ class RDDInfo(
3636

3737
override def toString = {
3838
import Utils.bytesToString
39-
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
40-
"TachyonSize: %s; DiskSize: %s").format(
39+
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
40+
"MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
4141
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
4242
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
4343
}

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,13 @@ private[spark] object StorageUtils {
8989
// Add up memory, disk and Tachyon sizes
9090
val persistedBlocks =
9191
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
92+
val _storageLevel =
93+
if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel else StorageLevel.NONE
9294
val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
9395
val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
9496
val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
9597
rddInfoMap.get(rddId).map { rddInfo =>
98+
rddInfo.storageLevel = _storageLevel
9699
rddInfo.numCachedPartitions = persistedBlocks.length
97100
rddInfo.memSize = memSize
98101
rddInfo.diskSize = diskSize

core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
111111
assert(slices.forall(_.isInstanceOf[Range]))
112112
}
113113

114+
test("identical slice sizes between Range and NumericRange") {
115+
val r = ParallelCollectionRDD.slice(1 to 7, 4)
116+
val nr = ParallelCollectionRDD.slice(1L to 7L, 4)
117+
assert(r.size === 4)
118+
for (i <- 0 until r.size) {
119+
assert(r(i).size === nr(i).size)
120+
}
121+
}
122+
123+
test("identical slice sizes between List and NumericRange") {
124+
val r = ParallelCollectionRDD.slice(List(1, 2), 4)
125+
val nr = ParallelCollectionRDD.slice(1L to 2L, 4)
126+
assert(r.size === 4)
127+
for (i <- 0 until r.size) {
128+
assert(r(i).size === nr(i).size)
129+
}
130+
}
131+
114132
test("large ranges don't overflow") {
115133
val N = 100 * 1000 * 1000
116134
val data = 0 until N

docs/programming-guide.md

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -377,13 +377,15 @@ Some notes on reading files with Spark:
377377

378378
* The `textFile` method also takes an optional second argument for controlling the number of slices of the file. By default, Spark creates one slice for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of slices by passing a larger value. Note that you cannot have fewer slices than blocks.
379379

380-
Apart from reading files as a collection of lines,
381-
`SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file.
380+
Apart from text files, Spark's Python API also supports several other data formats:
382381

383-
### SequenceFile and Hadoop InputFormats
382+
* `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file.
383+
384+
* `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
384385

385-
In addition to reading text files, PySpark supports reading ```SequenceFile```
386-
and any arbitrary ```InputFormat```.
386+
* Details on reading `SequenceFile` and arbitrary Hadoop `InputFormat` are given below.
387+
388+
### SequenceFile and Hadoop InputFormats
387389

388390
**Note** this feature is currently marked ```Experimental``` and is intended for advanced users. It may be replaced in future with read/write support based on SparkSQL, in which case SparkSQL is the preferred approach.
389391

@@ -760,6 +762,11 @@ val counts = pairs.reduceByKey((a, b) => a + b)
760762
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
761763
`counts.collect()` to bring them back to the driver program as an array of objects.
762764

765+
**Note:** when using custom objects as the key in key-value pair operations, you must be sure that a
766+
custom `equals()` method is accompanied with a matching `hashCode()` method. For full details, see
767+
the contract outlined in the [Object.hashCode()
768+
documentation](http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()).
769+
763770
</div>
764771

765772
<div data-lang="java" markdown="1">
@@ -792,6 +799,10 @@ JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
792799
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
793800
`counts.collect()` to bring them back to the driver program as an array of objects.
794801

802+
**Note:** when using custom objects as the key in key-value pair operations, you must be sure that a
803+
custom `equals()` method is accompanied with a matching `hashCode()` method. For full details, see
804+
the contract outlined in the [Object.hashCode()
805+
documentation](http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()).
795806

796807
</div>
797808

@@ -888,7 +899,7 @@ for details.
888899
</tr>
889900
<tr>
890901
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
891-
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
902+
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
892903
</tr>
893904
<tr>
894905
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td>
@@ -1056,7 +1067,10 @@ storage levels is:
10561067
<td> Store RDD in serialized format in <a href="http://tachyon-project.org">Tachyon</a>.
10571068
Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors
10581069
to be smaller and to share a pool of memory, making it attractive in environments with
1059-
large heaps or multiple concurrent applications.
1070+
large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon,
1071+
the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory
1072+
in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts
1073+
from memory.
10601074
</td>
10611075
</tr>
10621076
</table>

docs/running-on-yarn.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,20 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
6767
The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI.
6868
</td>
6969
</tr>
70+
<tr>
71+
<td><code>spark.yarn.executor.memoryOverhead</code></td>
72+
<td>384</code></td>
73+
<td>
74+
The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
75+
</td>
76+
</tr>
77+
<tr>
78+
<td><code>spark.yarn.driver.memoryOverhead</code></td>
79+
<td>384</code></td>
80+
<td>
81+
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
82+
</td>
83+
</tr>
7084
</table>
7185

7286
By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`.

docs/streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -950,7 +950,7 @@ is 200 milliseconds.
950950

951951
An alternative to receiving data with multiple input streams / receivers is to explicitly repartition
952952
the input data stream (using `inputStream.repartition(<number of partitions>)`).
953-
This distributes the received batches of data across all the machines in the cluster
953+
This distributes the received batches of data across specified number of machines in the cluster
954954
before further processing.
955955

956956
### Level of Parallelism in Data Processing

python/pyspark/sql.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
from pyspark.rdd import RDD
19+
from pyspark.serializers import BatchedSerializer, PickleSerializer
1920

2021
from py4j.protocol import Py4JError
2122

@@ -76,12 +77,25 @@ def inferSchema(self, rdd):
7677
"""Infer and apply a schema to an RDD of L{dict}s.
7778
7879
We peek at the first row of the RDD to determine the fields names
79-
and types, and then use that to extract all the dictionaries.
80+
and types, and then use that to extract all the dictionaries. Nested
81+
collections are supported, which include array, dict, list, set, and
82+
tuple.
8083
8184
>>> srdd = sqlCtx.inferSchema(rdd)
8285
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
8386
... {"field1" : 3, "field2": "row3"}]
8487
True
88+
89+
>>> from array import array
90+
>>> srdd = sqlCtx.inferSchema(nestedRdd1)
91+
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
92+
... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
93+
True
94+
95+
>>> srdd = sqlCtx.inferSchema(nestedRdd2)
96+
>>> srdd.collect() == [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
97+
... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
98+
True
8599
"""
86100
if (rdd.__class__ is SchemaRDD):
87101
raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__)
@@ -346,7 +360,8 @@ def _toPython(self):
346360
# TODO: This is inefficient, we should construct the Python Row object
347361
# in Java land in the javaToPython function. May require a custom
348362
# pickle serializer in Pyrolite
349-
return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d))
363+
return RDD(jrdd, self._sc, BatchedSerializer(
364+
PickleSerializer())).map(lambda d: Row(d))
350365

351366
# We override the default cache/persist/checkpoint behavior as we want to cache the underlying
352367
# SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class
@@ -411,6 +426,7 @@ def subtract(self, other, numPartitions=None):
411426

412427
def _test():
413428
import doctest
429+
from array import array
414430
from pyspark.context import SparkContext
415431
globs = globals().copy()
416432
# The small batch size here ensures that we see multiple batches,
@@ -420,6 +436,12 @@ def _test():
420436
globs['sqlCtx'] = SQLContext(sc)
421437
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
422438
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
439+
globs['nestedRdd1'] = sc.parallelize([
440+
{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
441+
{"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
442+
globs['nestedRdd2'] = sc.parallelize([
443+
{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
444+
{"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}])
423445
(failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
424446
globs['sc'].stop()
425447
if failure_count:

0 commit comments

Comments
 (0)