Skip to content

Commit c3fdbab

Browse files
committed
Merge pull request #3 from markhamstra/master-csd
SPY-138 DAGScheduler job <--> stage accounting
2 parents 499fa12 + e5684e5 commit c3fdbab

File tree

84 files changed

+5576
-580
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+5576
-580
lines changed

CHANGES.txt

Lines changed: 559 additions & 0 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ versions without YARN, use:
5555
# Cloudera CDH 4.2.0 with MapReduce v1
5656
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly
5757

58-
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
58+
For Apache Hadoop 2.0.X, 2.1.X, 2.2.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
5959
with YARN, also set `SPARK_YARN=true`:
6060

6161
# Apache Hadoop 2.0.5-alpha
@@ -64,8 +64,8 @@ with YARN, also set `SPARK_YARN=true`:
6464
# Cloudera CDH 4.2.0 with MapReduce v2
6565
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly
6666

67-
For convenience, these variables may also be set through the `conf/spark-env.sh` file
68-
described below.
67+
# Apache Hadoop 2.2.0 with YARN
68+
$ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly
6969

7070
When developing a Spark application, specify the Hadoop version by adding the
7171
"hadoop-client" artifact to your project's dependencies. For example, if you're

assembly/pom.xml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,23 @@
4646
</repositories>
4747

4848
<dependencies>
49+
<dependency>
50+
<groupId>${akka.group}</groupId>
51+
<artifactId>akka-actor</artifactId>
52+
</dependency>
53+
<dependency>
54+
<groupId>${akka.group}</groupId>
55+
<artifactId>akka-remote</artifactId>
56+
</dependency>
57+
<dependency>
58+
<groupId>${akka.group}</groupId>
59+
<artifactId>akka-slf4j</artifactId>
60+
</dependency>
61+
<dependency>
62+
<groupId>${akka.group}</groupId>
63+
<artifactId>akka-zeromq</artifactId>
64+
</dependency>
65+
4966
<dependency>
5067
<groupId>org.apache.spark</groupId>
5168
<artifactId>spark-core_2.9.3</artifactId>
@@ -140,6 +157,17 @@
140157
</dependency>
141158
</dependencies>
142159
</profile>
160+
<profile>
161+
<id>new-yarn</id>
162+
<dependencies>
163+
<dependency>
164+
<groupId>org.apache.spark</groupId>
165+
<artifactId>spark-yarn_2.9.3</artifactId>
166+
<version>${project.version}</version>
167+
</dependency>
168+
</dependencies>
169+
</profile>
170+
143171
<profile>
144172
<id>bigtop-dist</id>
145173
<!-- This profile uses the assembly plugin to create a special "dist" package for BigTop

bagel/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,18 @@
3232
<url>http://spark.incubator.apache.org/</url>
3333

3434
<dependencies>
35+
<dependency>
36+
<groupId>${akka.group}</groupId>
37+
<artifactId>akka-actor</artifactId>
38+
</dependency>
39+
<dependency>
40+
<groupId>${akka.group}</groupId>
41+
<artifactId>akka-remote</artifactId>
42+
</dependency>
43+
<dependency>
44+
<groupId>${akka.group}</groupId>
45+
<artifactId>akka-slf4j</artifactId>
46+
</dependency>
3547
<dependency>
3648
<groupId>org.apache.spark</groupId>
3749
<artifactId>spark-core_2.9.3</artifactId>

core/pom.xml

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@
9595
<version>0.3.1</version>
9696
</dependency>
9797
<dependency>
98-
<groupId>com.typesafe.akka</groupId>
98+
<groupId>${akka.group}</groupId>
9999
<artifactId>akka-actor</artifactId>
100100
</dependency>
101101
<dependency>
102-
<groupId>com.typesafe.akka</groupId>
102+
<groupId>${akka.group}</groupId>
103103
<artifactId>akka-remote</artifactId>
104104
</dependency>
105105
<dependency>
106-
<groupId>com.typesafe.akka</groupId>
106+
<groupId>${akka.group}</groupId>
107107
<artifactId>akka-slf4j</artifactId>
108108
</dependency>
109109
<dependency>
@@ -126,10 +126,6 @@
126126
<groupId>colt</groupId>
127127
<artifactId>colt</artifactId>
128128
</dependency>
129-
<dependency>
130-
<groupId>com.github.scala-incubator.io</groupId>
131-
<artifactId>scala-io-file_2.9.2</artifactId>
132-
</dependency>
133129
<dependency>
134130
<groupId>org.apache.mesos</groupId>
135131
<artifactId>mesos</artifactId>

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ private[spark] class MapOutputTracker extends Logging {
247247
case Some(bytes) =>
248248
return bytes
249249
case None =>
250-
statuses = mapStatuses(shuffleId)
250+
statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
251251
epochGotten = epoch
252252
}
253253
}
@@ -261,9 +261,13 @@ private[spark] class MapOutputTracker extends Logging {
261261
cachedSerializedStatuses(shuffleId) = bytes
262262
}
263263
}
264-
return bytes
264+
bytes
265265
}
266266

267+
def has(shuffleId: Int): Boolean = {
268+
cachedSerializedStatuses.get(shuffleId).isDefined || mapStatuses.contains(shuffleId)
269+
}
270+
267271
// Serialize an array of map output locations into an efficient byte format so that we can send
268272
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
269273
// generally be pretty compressible because many map outputs will be on the same hostname.

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger
2424

2525
import scala.collection.Map
2626
import scala.collection.generic.Growable
27-
import scala.collection.JavaConverters._
2827
import scala.collection.mutable.ArrayBuffer
2928
import scala.collection.mutable.HashMap
3029

@@ -82,7 +81,7 @@ class SparkContext(
8281
val sparkHome: String = null,
8382
val jars: Seq[String] = Nil,
8483
val environment: Map[String, String] = Map(),
85-
// This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc)
84+
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
8685
// too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
8786
// of data-local splits on host
8887
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@@ -227,6 +226,31 @@ class SparkContext(
227226
scheduler.initialize(backend)
228227
scheduler
229228

229+
case "yarn-client" =>
230+
val scheduler = try {
231+
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
232+
val cons = clazz.getConstructor(classOf[SparkContext])
233+
cons.newInstance(this).asInstanceOf[ClusterScheduler]
234+
235+
} catch {
236+
case th: Throwable => {
237+
throw new SparkException("YARN mode not available ?", th)
238+
}
239+
}
240+
241+
val backend = try {
242+
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
243+
val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
244+
cons.newInstance(scheduler, this).asInstanceOf[CoarseGrainedSchedulerBackend]
245+
} catch {
246+
case th: Throwable => {
247+
throw new SparkException("YARN mode not available ?", th)
248+
}
249+
}
250+
251+
scheduler.initialize(backend)
252+
scheduler
253+
230254
case _ =>
231255
if (MESOS_REGEX.findFirstIn(master).isEmpty) {
232256
logWarning("Master %s does not match expected format, parsing as Mesos URL".format(master))

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
/*
22
*
3-
* * Licensed to the Apache Software Foundation (ASF) under one or more
4-
* * contributor license agreements. See the NOTICE file distributed with
5-
* * this work for additional information regarding copyright ownership.
6-
* * The ASF licenses this file to You under the Apache License, Version 2.0
7-
* * (the "License"); you may not use this file except in compliance with
8-
* * the License. You may obtain a copy of the License at
9-
* *
10-
* * http://www.apache.org/licenses/LICENSE-2.0
11-
* *
12-
* * Unless required by applicable law or agreed to in writing, software
13-
* * distributed under the License is distributed on an "AS IS" BASIS,
14-
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
* * See the License for the specific language governing permissions and
16-
* * limitations under the License.
3+
* Licensed to the Apache Software Foundation (ASF) under one or more
4+
* contributor license agreements. See the NOTICE file distributed with
5+
* this work for additional information regarding copyright ownership.
6+
* The ASF licenses this file to You under the Apache License, Version 2.0
7+
* (the "License"); you may not use this file except in compliance with
8+
* the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
1717
*
1818
*/
1919

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
20+
import akka.actor.ActorSystem
2121

2222
import org.apache.spark.deploy.worker.Worker
2323
import org.apache.spark.deploy.master.Master
24-
import org.apache.spark.util.{Utils, AkkaUtils}
25-
import org.apache.spark.{Logging}
24+
import org.apache.spark.util.Utils
25+
import org.apache.spark.Logging
2626

2727
import scala.collection.mutable.ArrayBuffer
2828

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
2222
import akka.actor.{ActorRef, Actor, Props, Terminated}
2323
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
2424

25-
import org.apache.spark.{Logging, SparkEnv}
25+
import org.apache.spark.Logging
2626
import org.apache.spark.TaskState.TaskState
2727
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
2828
import org.apache.spark.util.{Utils, AkkaUtils}

0 commit comments

Comments
 (0)