Skip to content

Commit e7d9aa6

Browse files
committed
rebase to master
2 parents 326a17f + a7d145e commit e7d9aa6

File tree

177 files changed

+4901
-1029
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

177 files changed

+4901
-1029
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ unit-tests.log
5151
rat-results.txt
5252
scalastyle.txt
5353
conf/*.conf
54+
scalastyle-output.xml
5455

5556
# For Hive
5657
metastore_db/

assembly/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<deb.pkg.name>spark</deb.pkg.name>
4040
<deb.install.path>/usr/share/spark</deb.install.path>
4141
<deb.user>root</deb.user>
42+
<deb.bin.filemode>744</deb.bin.filemode>
4243
</properties>
4344

4445
<dependencies>
@@ -276,7 +277,7 @@
276277
<user>${deb.user}</user>
277278
<group>${deb.user}</group>
278279
<prefix>${deb.install.path}/bin</prefix>
279-
<filemode>744</filemode>
280+
<filemode>${deb.bin.filemode}</filemode>
280281
</mapper>
281282
</data>
282283
<data>

bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ object Bagel extends Logging {
7272
var verts = vertices
7373
var msgs = messages
7474
var noActivity = false
75+
var lastRDD: RDD[(K, (V, Array[M]))] = null
7576
do {
7677
logInfo("Starting superstep " + superstep + ".")
7778
val startTime = System.currentTimeMillis
@@ -83,6 +84,10 @@ object Bagel extends Logging {
8384
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
8485
val (processed, numMsgs, numActiveVerts) =
8586
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
87+
if (lastRDD != null) {
88+
lastRDD.unpersist(false)
89+
}
90+
lastRDD = processed
8691

8792
val timeTaken = System.currentTimeMillis - startTime
8893
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))

core/src/main/scala/org/apache/spark/Aggregator.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,7 @@ case class Aggregator[K, V, C] (
5555
combiners.iterator
5656
} else {
5757
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
58-
while (iter.hasNext) {
59-
val pair = iter.next()
60-
combiners.insert(pair._1, pair._2)
61-
}
58+
combiners.insertAll(iter)
6259
// TODO: Make this non optional in a future release
6360
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
6461
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.spark
1919

20-
import scala.collection.mutable.{ArrayBuffer, HashSet}
20+
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
2122

22-
import org.apache.spark.executor.InputMetrics
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.storage._
2525

@@ -30,7 +30,7 @@ import org.apache.spark.storage._
3030
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
3131

3232
/** Keys of RDD partitions that are being computed/loaded. */
33-
private val loading = new HashSet[RDDBlockId]()
33+
private val loading = new mutable.HashSet[RDDBlockId]
3434

3535
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
3636
def getOrCompute[T](
@@ -118,36 +118,66 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
118118
}
119119

120120
/**
121-
* Cache the values of a partition, keeping track of any updates in the storage statuses
122-
* of other blocks along the way.
121+
* Cache the values of a partition, keeping track of any updates in the storage statuses of
122+
* other blocks along the way.
123+
*
124+
* The effective storage level refers to the level that actually specifies BlockManager put
125+
* behavior, not the level originally specified by the user. This is mainly for forcing a
126+
* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
127+
* while preserving the the original semantics of the RDD as specified by the application.
123128
*/
124129
private def putInBlockManager[T](
125130
key: BlockId,
126131
values: Iterator[T],
127-
storageLevel: StorageLevel,
128-
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {
129-
130-
if (!storageLevel.useMemory) {
131-
/* This RDD is not to be cached in memory, so we can just pass the computed values
132-
* as an iterator directly to the BlockManager, rather than first fully unrolling
133-
* it in memory. The latter option potentially uses much more memory and risks OOM
134-
* exceptions that can be avoided. */
135-
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
132+
level: StorageLevel,
133+
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
134+
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
135+
136+
val putLevel = effectiveStorageLevel.getOrElse(level)
137+
if (!putLevel.useMemory) {
138+
/*
139+
* This RDD is not to be cached in memory, so we can just pass the computed values as an
140+
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
141+
*/
142+
updatedBlocks ++=
143+
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
136144
blockManager.get(key) match {
137145
case Some(v) => v.data.asInstanceOf[Iterator[T]]
138146
case None =>
139147
logInfo(s"Failure to store $key")
140148
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
141149
}
142150
} else {
143-
/* This RDD is to be cached in memory. In this case we cannot pass the computed values
151+
/*
152+
* This RDD is to be cached in memory. In this case we cannot pass the computed values
144153
* to the BlockManager as an iterator and expect to read it back later. This is because
145-
* we may end up dropping a partition from memory store before getting it back, e.g.
146-
* when the entirety of the RDD does not fit in memory. */
147-
val elements = new ArrayBuffer[Any]
148-
elements ++= values
149-
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
150-
elements.iterator.asInstanceOf[Iterator[T]]
154+
* we may end up dropping a partition from memory store before getting it back.
155+
*
156+
* In addition, we must be careful to not unroll the entire partition in memory at once.
157+
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
158+
* single partition. Instead, we unroll the values cautiously, potentially aborting and
159+
* dropping the partition to disk if applicable.
160+
*/
161+
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
162+
case Left(arr) =>
163+
// We have successfully unrolled the entire partition, so cache it in memory
164+
updatedBlocks ++=
165+
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
166+
arr.iterator.asInstanceOf[Iterator[T]]
167+
case Right(it) =>
168+
// There is not enough space to cache this partition in memory
169+
logWarning(s"Not enough space to cache partition $key in memory! " +
170+
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
171+
val returnValues = it.asInstanceOf[Iterator[T]]
172+
if (putLevel.useDisk) {
173+
logWarning(s"Persisting partition $key to disk instead.")
174+
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
175+
useOffHeap = false, deserialized = false, putLevel.replication)
176+
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
177+
} else {
178+
returnValues
179+
}
180+
}
151181
}
152182
}
153183

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.rdd.SortOrder.SortOrder
2223
import org.apache.spark.serializer.Serializer
2324
import org.apache.spark.shuffle.ShuffleHandle
2425

@@ -62,7 +63,8 @@ class ShuffleDependency[K, V, C](
6263
val serializer: Option[Serializer] = None,
6364
val keyOrdering: Option[Ordering[K]] = None,
6465
val aggregator: Option[Aggregator[K, V, C]] = None,
65-
val mapSideCombine: Boolean = false)
66+
val mapSideCombine: Boolean = false,
67+
val sortOrder: Option[SortOrder] = None)
6668
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
6769

6870
val shuffleId: Int = rdd.context.newShuffleId()

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,7 @@ class SparkContext(config: SparkConf) extends Logging {
10371037
*/
10381038
private[spark] def getCallSite(): CallSite = {
10391039
Option(getLocalProperty("externalCallSite")) match {
1040-
case Some(callSite) => CallSite(callSite, long = "")
1040+
case Some(callSite) => CallSite(callSite, longForm = "")
10411041
case None => Utils.getCallSite
10421042
}
10431043
}
@@ -1059,11 +1059,12 @@ class SparkContext(config: SparkConf) extends Logging {
10591059
}
10601060
val callSite = getCallSite
10611061
val cleanedFunc = clean(func)
1062-
logInfo("Starting job: " + callSite.short)
1062+
logInfo("Starting job: " + callSite.shortForm)
10631063
val start = System.nanoTime
10641064
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
10651065
resultHandler, localProperties.get)
1066-
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
1066+
logInfo(
1067+
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
10671068
rdd.doCheckpoint()
10681069
}
10691070

@@ -1144,11 +1145,12 @@ class SparkContext(config: SparkConf) extends Logging {
11441145
evaluator: ApproximateEvaluator[U, R],
11451146
timeout: Long): PartialResult[R] = {
11461147
val callSite = getCallSite
1147-
logInfo("Starting job: " + callSite.short)
1148+
logInfo("Starting job: " + callSite.shortForm)
11481149
val start = System.nanoTime
11491150
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
11501151
localProperties.get)
1151-
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
1152+
logInfo(
1153+
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
11521154
result
11531155
}
11541156

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class SparkEnv (
6767
val metricsSystem: MetricsSystem,
6868
val conf: SparkConf) extends Logging {
6969

70-
// A mapping of thread ID to amount of memory used for shuffle in bytes
70+
// A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
7171
// All accesses should be manually synchronized
7272
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
7373

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark._
3434
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
3535
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3636
import org.apache.spark.broadcast.Broadcast
37-
import org.apache.spark.rdd.RDD
37+
import org.apache.spark.rdd.{EmptyRDD, RDD}
3838

3939
/**
4040
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
@@ -112,6 +112,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
112112

113113
def startTime: java.lang.Long = sc.startTime
114114

115+
/** The version of Spark on which this application is running. */
116+
def version: String = sc.version
117+
115118
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
116119
def defaultParallelism: java.lang.Integer = sc.defaultParallelism
117120

@@ -132,6 +135,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
132135
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
133136
}
134137

138+
/** Get an RDD that has no partitions or elements. */
139+
def emptyRDD[T]: JavaRDD[T] = {
140+
implicit val ctag: ClassTag[T] = fakeClassTag
141+
JavaRDD.fromRDD(new EmptyRDD[T](sc))
142+
}
143+
144+
135145
/** Distribute a local Scala collection to form an RDD. */
136146
def parallelize[T](list: java.util.List[T]): JavaRDD[T] =
137147
parallelize(list, sc.defaultParallelism)

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ import org.apache.spark.broadcast.Broadcast
3737
import org.apache.spark.rdd.RDD
3838
import org.apache.spark.util.Utils
3939

40-
private[spark] class PythonRDD[T: ClassTag](
41-
parent: RDD[T],
40+
private[spark] class PythonRDD(
41+
parent: RDD[_],
4242
command: Array[Byte],
4343
envVars: JMap[String, String],
4444
pythonIncludes: JList[String],
@@ -57,7 +57,10 @@ private[spark] class PythonRDD[T: ClassTag](
5757
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
5858
val startTime = System.currentTimeMillis
5959
val env = SparkEnv.get
60-
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
60+
val localdir = env.blockManager.diskBlockManager.localDirs.map(
61+
f => f.getPath()).mkString(",")
62+
val worker: Socket = env.createPythonWorker(pythonExec,
63+
envVars.toMap + ("SPARK_LOCAL_DIR" -> localdir))
6164

6265
// Start a thread to feed the process input from our parent's iterator
6366
val writerThread = new WriterThread(env, worker, split, context)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ object SparkSubmit {
269269
sysProps.getOrElseUpdate(k, v)
270270
}
271271

272+
// Spark properties included on command line take precedence
273+
sysProps ++= args.sparkProperties
274+
272275
(childArgs, childClasspath, sysProps, childMainClass)
273276
}
274277

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
5555
var verbose: Boolean = false
5656
var isPython: Boolean = false
5757
var pyFiles: String = null
58+
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5859

5960
parseOpts(args.toList)
6061
loadDefaults()
@@ -177,6 +178,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
177178
| executorCores $executorCores
178179
| totalExecutorCores $totalExecutorCores
179180
| propertiesFile $propertiesFile
181+
| extraSparkProperties $sparkProperties
180182
| driverMemory $driverMemory
181183
| driverCores $driverCores
182184
| driverExtraClassPath $driverExtraClassPath
@@ -290,6 +292,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
290292
jars = Utils.resolveURIs(value)
291293
parse(tail)
292294

295+
case ("--conf" | "-c") :: value :: tail =>
296+
value.split("=", 2).toSeq match {
297+
case Seq(k, v) => sparkProperties(k) = v
298+
case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
299+
}
300+
parse(tail)
301+
293302
case ("--help" | "-h") :: tail =>
294303
printUsageAndExit(0)
295304

@@ -349,6 +358,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
349358
| on the PYTHONPATH for Python apps.
350359
| --files FILES Comma-separated list of files to be placed in the working
351360
| directory of each executor.
361+
|
362+
| --conf PROP=VALUE Arbitrary Spark configuration property.
352363
| --properties-file FILE Path to a file from which to load extra properties. If not
353364
| specified, this will look for conf/spark-defaults.conf.
354365
|

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
169169
val ui: SparkUI = if (renderUI) {
170170
val conf = this.conf.clone()
171171
val appSecManager = new SecurityManager(conf)
172-
new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
172+
new SparkUI(conf, appSecManager, replayBus, appId,
173+
HistoryServer.UI_PATH_PREFIX + s"/$appId")
173174
// Do not call ui.bind() to avoid creating a new server for each application
174175
} else {
175176
null

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
7575
"Last Updated")
7676

7777
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
78-
val uiAddress = "/history/" + info.id
78+
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
7979
val startTime = UIUtils.formatDate(info.startTime)
8080
val endTime = UIUtils.formatDate(info.endTime)
8181
val duration = UIUtils.formatDuration(info.endTime - info.startTime)

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class HistoryServer(
114114
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
115115

116116
val contextHandler = new ServletContextHandler
117-
contextHandler.setContextPath("/history")
117+
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
118118
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
119119
attachHandler(contextHandler)
120120
}
@@ -172,6 +172,8 @@ class HistoryServer(
172172
object HistoryServer extends Logging {
173173
private val conf = new SparkConf
174174

175+
val UI_PATH_PREFIX = "/history"
176+
175177
def main(argStrings: Array[String]) {
176178
SignalLogger.register(log)
177179
initSecurity()

0 commit comments

Comments
 (0)