Skip to content

Commit 2e4fe00

Browse files
committed
Merge pull request #3 from apache/master
Merge upstream updates
2 parents 89fde08 + 171ebb3 commit 2e4fe00

File tree

155 files changed

+5996
-1116
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

155 files changed

+5996
-1116
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ spark-env.sh.template
2222
log4j-defaults.properties
2323
sorttable.js
2424
.*txt
25+
.*json
2526
.*data
2627
.*log
2728
cloudpickle.py

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,24 @@ span.kill-link {
8787
span.kill-link a {
8888
color: gray;
8989
}
90+
91+
span.expand-details {
92+
font-size: 10pt;
93+
cursor: pointer;
94+
color: grey;
95+
float: right;
96+
}
97+
98+
.stage-details {
99+
max-height: 100px;
100+
overflow-y: auto;
101+
margin: 0;
102+
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
103+
}
104+
105+
.stage-details.collapsed {
106+
max-height: 0;
107+
padding-top: 0;
108+
padding-bottom: 0;
109+
border: none;
110+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
4949
import org.apache.spark.scheduler.local.LocalBackend
5050
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
5151
import org.apache.spark.ui.SparkUI
52-
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
52+
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5353

5454
/**
5555
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -224,7 +224,6 @@ class SparkContext(config: SparkConf) extends Logging {
224224

225225
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
226226
val hadoopConfiguration: Configuration = {
227-
val env = SparkEnv.get
228227
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
229228
// Explicitly check for S3 environment variables
230229
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
@@ -1036,9 +1035,11 @@ class SparkContext(config: SparkConf) extends Logging {
10361035
* Capture the current user callsite and return a formatted version for printing. If the user
10371036
* has overridden the call site, this will return the user's version.
10381037
*/
1039-
private[spark] def getCallSite(): String = {
1040-
val defaultCallSite = Utils.getCallSiteInfo
1041-
Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
1038+
private[spark] def getCallSite(): CallSite = {
1039+
Option(getLocalProperty("externalCallSite")) match {
1040+
case Some(callSite) => CallSite(callSite, long = "")
1041+
case None => Utils.getCallSite
1042+
}
10421043
}
10431044

10441045
/**
@@ -1058,11 +1059,11 @@ class SparkContext(config: SparkConf) extends Logging {
10581059
}
10591060
val callSite = getCallSite
10601061
val cleanedFunc = clean(func)
1061-
logInfo("Starting job: " + callSite)
1062+
logInfo("Starting job: " + callSite.short)
10621063
val start = System.nanoTime
10631064
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
10641065
resultHandler, localProperties.get)
1065-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1066+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
10661067
rdd.doCheckpoint()
10671068
}
10681069

@@ -1143,11 +1144,11 @@ class SparkContext(config: SparkConf) extends Logging {
11431144
evaluator: ApproximateEvaluator[U, R],
11441145
timeout: Long): PartialResult[R] = {
11451146
val callSite = getCallSite
1146-
logInfo("Starting job: " + callSite)
1147+
logInfo("Starting job: " + callSite.short)
11471148
val start = System.nanoTime
11481149
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
11491150
localProperties.get)
1150-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1151+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
11511152
result
11521153
}
11531154

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
543543
partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
544544
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
545545

546+
/**
547+
* For each key k in `this` or `other1` or `other2` or `other3`,
548+
* return a resulting RDD that contains a tuple with the list of values
549+
* for that key in `this`, `other1`, `other2` and `other3`.
550+
*/
551+
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
552+
other2: JavaPairRDD[K, W2],
553+
other3: JavaPairRDD[K, W3],
554+
partitioner: Partitioner)
555+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
556+
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, partitioner)))
557+
546558
/**
547559
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
548560
* list of values for that key in `this` as well as `other`.
@@ -558,6 +570,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
558570
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
559571
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
560572

573+
/**
574+
* For each key k in `this` or `other1` or `other2` or `other3`,
575+
* return a resulting RDD that contains a tuple with the list of values
576+
* for that key in `this`, `other1`, `other2` and `other3`.
577+
*/
578+
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
579+
other2: JavaPairRDD[K, W2],
580+
other3: JavaPairRDD[K, W3])
581+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
582+
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3)))
583+
561584
/**
562585
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
563586
* list of values for that key in `this` as well as `other`.
@@ -574,6 +597,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
574597
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
575598
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
576599

600+
/**
601+
* For each key k in `this` or `other1` or `other2` or `other3`,
602+
* return a resulting RDD that contains a tuple with the list of values
603+
* for that key in `this`, `other1`, `other2` and `other3`.
604+
*/
605+
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
606+
other2: JavaPairRDD[K, W2],
607+
other3: JavaPairRDD[K, W3],
608+
numPartitions: Int)
609+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
610+
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, numPartitions)))
611+
577612
/** Alias for cogroup. */
578613
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
579614
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
@@ -583,6 +618,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
583618
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
584619
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
585620

621+
/** Alias for cogroup. */
622+
def groupWith[W1, W2, W3](other1: JavaPairRDD[K, W1],
623+
other2: JavaPairRDD[K, W2],
624+
other3: JavaPairRDD[K, W3])
625+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
626+
fromRDD(cogroupResult3ToJava(rdd.groupWith(other1, other2, other3)))
627+
586628
/**
587629
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
588630
* RDD has a known partitioner by only searching the partition that the key maps to.
@@ -786,6 +828,15 @@ object JavaPairRDD {
786828
.mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
787829
}
788830

831+
private[spark]
832+
def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3](
833+
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))])
834+
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = {
835+
rddToPairRDDFunctions(rdd)
836+
.mapValues(x =>
837+
(asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), asJavaIterable(x._4)))
838+
}
839+
789840
def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
790841
new JavaPairRDD[K, V](rdd)
791842
}

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.api.java
1919

20+
import java.util.Comparator
21+
2022
import scala.language.implicitConversions
2123
import scala.reflect.ClassTag
2224

2325
import org.apache.spark._
26+
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
2427
import org.apache.spark.api.java.function.{Function => JFunction}
2528
import org.apache.spark.rdd.RDD
2629
import org.apache.spark.storage.StorageLevel
@@ -172,6 +175,19 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
172175
rdd.setName(name)
173176
this
174177
}
178+
179+
/**
180+
* Return this RDD sorted by the given key function.
181+
*/
182+
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
183+
import scala.collection.JavaConverters._
184+
def fn = (x: T) => f.call(x)
185+
import com.google.common.collect.Ordering // shadows scala.math.Ordering
186+
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
187+
implicit val ctag: ClassTag[S] = fakeClassTag
188+
wrapRDD(rdd.sortBy(fn, ascending, numPartitions))
189+
}
190+
175191
}
176192

177193
object JavaRDD {

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import scala.collection.mutable.ListBuffer
2121

2222
import org.apache.log4j.Level
2323

24+
import org.apache.spark.util.MemoryParam
25+
2426
/**
2527
* Command-line parser for the driver client.
2628
*/
@@ -51,8 +53,8 @@ private[spark] class ClientArguments(args: Array[String]) {
5153
cores = value.toInt
5254
parse(tail)
5355

54-
case ("--memory" | "-m") :: value :: tail =>
55-
memory = value.toInt
56+
case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
57+
memory = value
5658
parse(tail)
5759

5860
case ("--supervise" | "-s") :: tail =>

core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
1919

2020
private[spark] object ExecutorState extends Enumeration {
2121

22-
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
22+
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
2323

2424
type ExecutorState = Value
2525

26-
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
26+
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
2727
}

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
2020
import java.util.Date
2121

2222
import scala.collection.mutable
23+
import scala.collection.mutable.ArrayBuffer
2324

2425
import akka.actor.ActorRef
2526

@@ -36,6 +37,7 @@ private[spark] class ApplicationInfo(
3637

3738
@transient var state: ApplicationState.Value = _
3839
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
40+
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
3941
@transient var coresGranted: Int = _
4042
@transient var endTime: Long = _
4143
@transient var appSource: ApplicationSource = _
@@ -51,6 +53,7 @@ private[spark] class ApplicationInfo(
5153
endTime = -1L
5254
appSource = new ApplicationSource(this)
5355
nextExecutorId = 0
56+
removedExecutors = new ArrayBuffer[ExecutorInfo]
5457
}
5558

5659
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -74,6 +77,7 @@ private[spark] class ApplicationInfo(
7477

7578
def removeExecutor(exec: ExecutorInfo) {
7679
if (executors.contains(exec.id)) {
80+
removedExecutors += executors(exec.id)
7781
executors -= exec.id
7882
coresGranted -= exec.cores
7983
}

core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,19 @@ private[spark] class ExecutorInfo(
3434
}
3535

3636
def fullId: String = application.id + "/" + id
37+
38+
override def equals(other: Any): Boolean = {
39+
other match {
40+
case info: ExecutorInfo =>
41+
fullId == info.fullId &&
42+
worker.id == info.worker.id &&
43+
cores == info.cores &&
44+
memory == info.memory
45+
case _ => false
46+
}
47+
}
48+
49+
override def toString: String = fullId
50+
51+
override def hashCode: Int = toString.hashCode()
3752
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,11 @@ private[spark] class Master(
303303
appInfo.removeExecutor(exec)
304304
exec.worker.removeExecutor(exec)
305305

306+
val normalExit = exitStatus.exists(_ == 0)
306307
// Only retry certain number of times so we don't go into an infinite loop.
307-
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308+
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308309
schedule()
309-
} else {
310+
} else if (!normalExit) {
310311
logError("Application %s with ID %s failed %d times, removing it".format(
311312
appInfo.desc.name, appInfo.id, appInfo.retryCount))
312313
removeApplication(appInfo, ApplicationState.FAILED)

0 commit comments

Comments
 (0)