Skip to content

Commit 45884ab

Browse files
committed
Merge remote-tracking branch 'upstream/master' into ldaonline
s
2 parents f41c5ca + 5de14cc commit 45884ab

File tree

166 files changed

+7832
-766
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

166 files changed

+7832
-766
lines changed

assembly/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
<deb.pkg.name>spark</deb.pkg.name>
4040
<deb.install.path>/usr/share/spark</deb.install.path>
4141
<deb.user>root</deb.user>
42-
<deb.bin.filemode>744</deb.bin.filemode>
42+
<deb.bin.filemode>755</deb.bin.filemode>
4343
</properties>
4444

4545
<dependencies>
@@ -280,7 +280,7 @@
280280
<user>${deb.user}</user>
281281
<group>${deb.user}</group>
282282
<prefix>${deb.install.path}/conf</prefix>
283-
<filemode>744</filemode>
283+
<filemode>${deb.bin.filemode}</filemode>
284284
</mapper>
285285
</data>
286286
<data>
@@ -302,7 +302,7 @@
302302
<user>${deb.user}</user>
303303
<group>${deb.user}</group>
304304
<prefix>${deb.install.path}/sbin</prefix>
305-
<filemode>744</filemode>
305+
<filemode>${deb.bin.filemode}</filemode>
306306
</mapper>
307307
</data>
308308
<data>
@@ -313,7 +313,7 @@
313313
<user>${deb.user}</user>
314314
<group>${deb.user}</group>
315315
<prefix>${deb.install.path}/python</prefix>
316-
<filemode>744</filemode>
316+
<filemode>${deb.bin.filemode}</filemode>
317317
</mapper>
318318
</data>
319319
</dataSet>

bin/spark-shell.cmd

100755100644
File mode changed.

bin/spark-submit2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ set ORIG_ARGS=%*
2525
rem Reset the values of all variables used
2626
set SPARK_SUBMIT_DEPLOY_MODE=client
2727

28-
if not defined %SPARK_CONF_DIR% (
28+
if [%SPARK_CONF_DIR%] == [] (
2929
set SPARK_CONF_DIR=%SPARK_HOME%\conf
3030
)
3131
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_CONF_DIR%\spark-defaults.conf

core/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,14 @@
243243
<groupId>io.dropwizard.metrics</groupId>
244244
<artifactId>metrics-graphite</artifactId>
245245
</dependency>
246+
<dependency>
247+
<groupId>com.fasterxml.jackson.core</groupId>
248+
<artifactId>jackson-databind</artifactId>
249+
</dependency>
250+
<dependency>
251+
<groupId>com.fasterxml.jackson.module</groupId>
252+
<artifactId>jackson-module-scala_2.10</artifactId>
253+
</dependency>
246254
<dependency>
247255
<groupId>org.apache.derby</groupId>
248256
<artifactId>derby</artifactId>

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ span.expand-details {
103103
float: right;
104104
}
105105

106+
span.rest-uri {
107+
font-size: 10pt;
108+
font-style: italic;
109+
color: gray;
110+
}
111+
106112
pre {
107113
font-size: 0.8em;
108114
}

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4747
val inputMetrics = blockResult.inputMetrics
4848
val existingMetrics = context.taskMetrics
4949
.getInputMetricsForReadMethod(inputMetrics.readMethod)
50-
existingMetrics.addBytesRead(inputMetrics.bytesRead)
51-
52-
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
50+
existingMetrics.incBytesRead(inputMetrics.bytesRead)
5351

52+
val iter = blockResult.data.asInstanceOf[Iterator[T]]
53+
new InterruptibleIterator[T](context, iter) {
54+
override def next(): T = {
55+
existingMetrics.incRecordsRead(1)
56+
delegate.next()
57+
}
58+
}
5459
case None =>
5560
// Acquire a lock for loading this partition
5661
// If another thread already holds the lock, wait for it to finish return its results

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,15 @@ private[spark] class ExecutorAllocationManager(
7676
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
7777
Integer.MAX_VALUE)
7878

79-
// How long there must be backlogged tasks for before an addition is triggered
79+
// How long there must be backlogged tasks for before an addition is triggered (seconds)
8080
private val schedulerBacklogTimeout = conf.getLong(
81-
"spark.dynamicAllocation.schedulerBacklogTimeout", 60)
81+
"spark.dynamicAllocation.schedulerBacklogTimeout", 5)
8282

8383
// Same as above, but used only after `schedulerBacklogTimeout` is exceeded
8484
private val sustainedSchedulerBacklogTimeout = conf.getLong(
8585
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
8686

87-
// How long an executor must be idle for before it is removed
87+
// How long an executor must be idle for before it is removed (seconds)
8888
private val executorIdleTimeout = conf.getLong(
8989
"spark.dynamicAllocation.executorIdleTimeout", 600)
9090

@@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
486486
}
487487
}
488488

489-
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
490-
val executorId = blockManagerAdded.blockManagerId.executorId
489+
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
490+
val executorId = executorAdded.executorId
491491
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
492492
// This guards against the race condition in which the `SparkListenerTaskStart`
493493
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
@@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
498498
}
499499
}
500500

501-
override def onBlockManagerRemoved(
502-
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
503-
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
501+
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
502+
allocationManager.onExecutorRemoved(executorRemoved.executorId)
504503
}
505504

506505
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
288288
// the bound port to the cluster manager properly
289289
ui.foreach(_.bind())
290290

291-
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
291+
/**
292+
* A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
293+
*
294+
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
295+
* plan to set some global configurations for all Hadoop RDDs.
296+
*/
292297
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
293298

294299
// Add each JAR given through the constructor
@@ -694,7 +699,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
694699
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
695700
* using the older MapReduce API (`org.apache.hadoop.mapred`).
696701
*
697-
* @param conf JobConf for setting up the dataset
702+
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
703+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
704+
* sure you won't modify the conf. A safe approach is always creating a new conf for
705+
* a new RDD.
698706
* @param inputFormatClass Class of the InputFormat
699707
* @param keyClass Class of the keys
700708
* @param valueClass Class of the values
@@ -830,6 +838,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
830838
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
831839
* and extra configuration options to pass to the input format.
832840
*
841+
* @param conf Configuration for setting up the dataset. Note: This will be put into a Broadcast.
842+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
843+
* sure you won't modify the conf. A safe approach is always creating a new conf for
844+
* a new RDD.
845+
* @param fClass Class of the InputFormat
846+
* @param kClass Class of the keys
847+
* @param vClass Class of the values
848+
*
833849
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
834850
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
835851
* operation will create many references to the same object.
@@ -2094,7 +2110,7 @@ object SparkContext extends Logging {
20942110

20952111
val scheduler = new TaskSchedulerImpl(sc)
20962112
val localCluster = new LocalSparkCluster(
2097-
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
2113+
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
20982114
val masterUrls = localCluster.start()
20992115
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
21002116
scheduler.initialize(backend)

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,20 @@ private[spark] object TestUtils {
4343
* Note: if this is used during class loader tests, class names should be unique
4444
* in order to avoid interference between tests.
4545
*/
46-
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
46+
def createJarWithClasses(
47+
classNames: Seq[String],
48+
toStringValue: String = "",
49+
classNamesWithBase: Seq[(String, String)] = Seq(),
50+
classpathUrls: Seq[URL] = Seq()): URL = {
4751
val tempDir = Utils.createTempDir()
48-
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
52+
val files1 = for (name <- classNames) yield {
53+
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
54+
}
55+
val files2 = for ((childName, baseName) <- classNamesWithBase) yield {
56+
createCompiledClass(childName, tempDir, toStringValue, baseName, classpathUrls)
57+
}
4958
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
50-
createJar(files, jarFile)
59+
createJar(files1 ++ files2, jarFile)
5160
}
5261

5362

@@ -85,15 +94,26 @@ private[spark] object TestUtils {
8594
}
8695

8796
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
88-
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
97+
def createCompiledClass(
98+
className: String,
99+
destDir: File,
100+
toStringValue: String = "",
101+
baseClass: String = null,
102+
classpathUrls: Seq[URL] = Seq()): File = {
89103
val compiler = ToolProvider.getSystemJavaCompiler
104+
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
90105
val sourceFile = new JavaSourceFromString(className,
91-
"public class " + className + " implements java.io.Serializable {" +
92-
" @Override public String toString() { return \"" + value + "\"; }}")
106+
"public class " + className + extendsText + " implements java.io.Serializable {" +
107+
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
93108

94109
// Calling this outputs a class file in pwd. It's easier to just rename the file than
95110
// build a custom FileManager that controls the output location.
96-
compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call()
111+
val options = if (classpathUrls.nonEmpty) {
112+
Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator))
113+
} else {
114+
Seq()
115+
}
116+
compiler.getTask(null, null, null, options, null, Seq(sourceFile)).call()
97117

98118
val fileName = className + ".class"
99119
val result = new File(fileName)

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,15 @@ class JavaSparkContext(val sc: SparkContext)
373373
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
374374
* etc).
375375
*
376+
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
377+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
378+
* sure you won't modify the conf. A safe approach is always creating a new conf for
379+
* a new RDD.
380+
* @param inputFormatClass Class of the InputFormat
381+
* @param keyClass Class of the keys
382+
* @param valueClass Class of the values
383+
* @param minPartitions Minimum number of Hadoop Splits to generate.
384+
*
376385
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
377386
* record, directly caching the returned RDD will create many references to the same object.
378387
* If you plan to directly cache Hadoop writable objects, you should first copy them using
@@ -395,6 +404,14 @@ class JavaSparkContext(val sc: SparkContext)
395404
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
396405
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
397406
*
407+
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
408+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
409+
* sure you won't modify the conf. A safe approach is always creating a new conf for
410+
* a new RDD.
411+
* @param inputFormatClass Class of the InputFormat
412+
* @param keyClass Class of the keys
413+
* @param valueClass Class of the values
414+
*
398415
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
399416
* record, directly caching the returned RDD will create many references to the same object.
400417
* If you plan to directly cache Hadoop writable objects, you should first copy them using
@@ -476,6 +493,14 @@ class JavaSparkContext(val sc: SparkContext)
476493
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
477494
* and extra configuration options to pass to the input format.
478495
*
496+
* @param conf Configuration for setting up the dataset. Note: This will be put into a Broadcast.
497+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
498+
* sure you won't modify the conf. A safe approach is always creating a new conf for
499+
* a new RDD.
500+
* @param fClass Class of the InputFormat
501+
* @param kClass Class of the keys
502+
* @param vClass Class of the values
503+
*
479504
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
480505
* record, directly caching the returned RDD will create many references to the same object.
481506
* If you plan to directly cache Hadoop writable objects, you should first copy them using
@@ -675,6 +700,9 @@ class JavaSparkContext(val sc: SparkContext)
675700

676701
/**
677702
* Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
703+
*
704+
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
705+
* plan to set some global configurations for all Hadoop RDDs.
678706
*/
679707
def hadoopConfiguration(): Configuration = {
680708
sc.hadoopConfiguration

0 commit comments

Comments
 (0)