Skip to content

Commit 6bb9d91

Browse files
committed
Merge branch 'master' of github.com:apache/spark into streaming
2 parents 54bd92b + 6e27cb6 commit 6bb9d91

File tree

29 files changed

+317
-49
lines changed

29 files changed

+317
-49
lines changed

assembly/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,5 +349,15 @@
349349
</plugins>
350350
</build>
351351
</profile>
352+
<profile>
353+
<id>kinesis-asl</id>
354+
<dependencies>
355+
<dependency>
356+
<groupId>org.apache.httpcomponents</groupId>
357+
<artifactId>httpclient</artifactId>
358+
<version>${commons.httpclient.version}</version>
359+
</dependency>
360+
</dependencies>
361+
</profile>
352362
</profiles>
353363
</project>

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,15 @@ trait FutureAction[T] extends Future[T] {
8383
*/
8484
@throws(classOf[Exception])
8585
def get(): T = Await.result(this, Duration.Inf)
86+
87+
/**
88+
* Returns the job IDs run by the underlying async operation.
89+
*
90+
* This returns the current snapshot of the job list. Certain operations may run multiple
91+
* jobs, so multiple calls to this method may return different lists.
92+
*/
93+
def jobIds: Seq[Int]
94+
8695
}
8796

8897

@@ -150,8 +159,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
150159
}
151160
}
152161

153-
/** Get the corresponding job id for this action. */
154-
def jobId = jobWaiter.jobId
162+
def jobIds = Seq(jobWaiter.jobId)
155163
}
156164

157165

@@ -171,6 +179,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
171179
// is cancelled before the action was even run (and thus we have no thread to interrupt).
172180
@volatile private var _cancelled: Boolean = false
173181

182+
@volatile private var jobs: Seq[Int] = Nil
183+
174184
// A promise used to signal the future.
175185
private val p = promise[T]()
176186

@@ -219,6 +229,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
219229
}
220230
}
221231

232+
this.jobs = jobs ++ job.jobIds
233+
222234
// Wait for the job to complete. If the action is cancelled (with an interrupt),
223235
// cancel the job and stop the execution. This is not in a synchronized block because
224236
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
@@ -255,4 +267,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
255267
override def isCompleted: Boolean = p.isCompleted
256268

257269
override def value: Option[Try[T]] = p.future.value
270+
271+
def jobIds = jobs
272+
258273
}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.io.EOFException
2323

2424
import scala.collection.immutable.Map
2525
import scala.reflect.ClassTag
26+
import scala.collection.mutable.ListBuffer
2627

2728
import org.apache.hadoop.conf.{Configurable, Configuration}
2829
import org.apache.hadoop.mapred.FileSplit
@@ -43,6 +44,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
4344
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
4445
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
4546
import org.apache.spark.util.{NextIterator, Utils}
47+
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
4648

4749

4850
/**
@@ -249,9 +251,21 @@ class HadoopRDD[K, V](
249251
}
250252

251253
override def getPreferredLocations(split: Partition): Seq[String] = {
252-
// TODO: Filtering out "localhost" in case of file:// URLs
253-
val hadoopSplit = split.asInstanceOf[HadoopPartition]
254-
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
254+
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
255+
val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
256+
case Some(c) =>
257+
try {
258+
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
259+
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
260+
Some(HadoopRDD.convertSplitLocationInfo(infos))
261+
} catch {
262+
case e: Exception =>
263+
logDebug("Failed to use InputSplitWithLocations.", e)
264+
None
265+
}
266+
case None => None
267+
}
268+
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
255269
}
256270

257271
override def checkpoint() {
@@ -261,7 +275,7 @@ class HadoopRDD[K, V](
261275
def getConf: Configuration = getJobConf()
262276
}
263277

264-
private[spark] object HadoopRDD {
278+
private[spark] object HadoopRDD extends Logging {
265279
/** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
266280
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
267281

@@ -309,4 +323,42 @@ private[spark] object HadoopRDD {
309323
f(inputSplit, firstParent[T].iterator(split, context))
310324
}
311325
}
326+
327+
private[spark] class SplitInfoReflections {
328+
val inputSplitWithLocationInfo =
329+
Class.forName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
330+
val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo")
331+
val newInputSplit = Class.forName("org.apache.hadoop.mapreduce.InputSplit")
332+
val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo")
333+
val splitLocationInfo = Class.forName("org.apache.hadoop.mapred.SplitLocationInfo")
334+
val isInMemory = splitLocationInfo.getMethod("isInMemory")
335+
val getLocation = splitLocationInfo.getMethod("getLocation")
336+
}
337+
338+
private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try {
339+
Some(new SplitInfoReflections)
340+
} catch {
341+
case e: Exception =>
342+
logDebug("SplitLocationInfo and other new Hadoop classes are " +
343+
"unavailable. Using the older Hadoop location info code.", e)
344+
None
345+
}
346+
347+
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
348+
val out = ListBuffer[String]()
349+
infos.foreach { loc => {
350+
val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
351+
getLocation.invoke(loc).asInstanceOf[String]
352+
if (locationStr != "localhost") {
353+
if (HadoopRDD.SPLIT_INFO_REFLECTIONS.get.isInMemory.
354+
invoke(loc).asInstanceOf[Boolean]) {
355+
logDebug("Partition " + locationStr + " is cached by Hadoop.")
356+
out += new HDFSCacheTaskLocation(locationStr).toString
357+
} else {
358+
out += new HostTaskLocation(locationStr).toString
359+
}
360+
}
361+
}}
362+
out.seq
363+
}
312364
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,21 @@ class NewHadoopRDD[K, V](
173173
new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
174174
}
175175

176-
override def getPreferredLocations(split: Partition): Seq[String] = {
177-
val theSplit = split.asInstanceOf[NewHadoopPartition]
178-
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
176+
override def getPreferredLocations(hsplit: Partition): Seq[String] = {
177+
val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
178+
val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
179+
case Some(c) =>
180+
try {
181+
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
182+
Some(HadoopRDD.convertSplitLocationInfo(infos))
183+
} catch {
184+
case e : Exception =>
185+
logDebug("Failed to use InputSplit#getLocationInfo.", e)
186+
None
187+
}
188+
case None => None
189+
}
190+
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
179191
}
180192

181193
def getConf: Configuration = confBroadcast.value.value

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ abstract class RDD[T: ClassTag](
208208
}
209209

210210
/**
211-
* Get the preferred locations of a partition (as hostnames), taking into account whether the
211+
* Get the preferred locations of a partition, taking into account whether the
212212
* RDD is checkpointed.
213213
*/
214214
final def preferredLocations(split: Partition): Seq[String] = {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1303,7 +1303,7 @@ class DAGScheduler(
13031303
// If the RDD has some placement preferences (as is the case for input RDDs), get those
13041304
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
13051305
if (!rddPrefs.isEmpty) {
1306-
return rddPrefs.map(host => TaskLocation(host))
1306+
return rddPrefs.map(TaskLocation(_))
13071307
}
13081308
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
13091309
// that has any placement preferences. Ideally we would choose based on transfer sizes,

core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,51 @@ package org.apache.spark.scheduler
2222
* In the latter case, we will prefer to launch the task on that executorID, but our next level
2323
* of preference will be executors on the same host if this is not possible.
2424
*/
25-
private[spark]
26-
class TaskLocation private (val host: String, val executorId: Option[String]) extends Serializable {
27-
override def toString: String = "TaskLocation(" + host + ", " + executorId + ")"
25+
private[spark] sealed trait TaskLocation {
26+
def host: String
27+
}
28+
29+
/**
30+
* A location that includes both a host and an executor id on that host.
31+
*/
32+
private [spark] case class ExecutorCacheTaskLocation(override val host: String,
33+
val executorId: String) extends TaskLocation {
34+
}
35+
36+
/**
37+
* A location on a host.
38+
*/
39+
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
40+
override def toString = host
41+
}
42+
43+
/**
44+
* A location on a host that is cached by HDFS.
45+
*/
46+
private [spark] case class HDFSCacheTaskLocation(override val host: String)
47+
extends TaskLocation {
48+
override def toString = TaskLocation.inMemoryLocationTag + host
2849
}
2950

3051
private[spark] object TaskLocation {
31-
def apply(host: String, executorId: String) = new TaskLocation(host, Some(executorId))
52+
// We identify hosts on which the block is cached with this prefix. Because this prefix contains
53+
// underscores, which are not legal characters in hostnames, there should be no potential for
54+
// confusion. See RFC 952 and RFC 1123 for information about the format of hostnames.
55+
val inMemoryLocationTag = "hdfs_cache_"
56+
57+
def apply(host: String, executorId: String) = new ExecutorCacheTaskLocation(host, executorId)
3258

33-
def apply(host: String) = new TaskLocation(host, None)
59+
/**
60+
* Create a TaskLocation from a string returned by getPreferredLocations.
61+
* These strings have the form [hostname] or hdfs_cache_[hostname], depending on whether the
62+
* location is cached.
63+
*/
64+
def apply(str: String) = {
65+
val hstr = str.stripPrefix(inMemoryLocationTag)
66+
if (hstr.equals(str)) {
67+
new HostTaskLocation(str)
68+
} else {
69+
new HostTaskLocation(hstr)
70+
}
71+
}
3472
}

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,24 @@ private[spark] class TaskSetManager(
181181
}
182182

183183
for (loc <- tasks(index).preferredLocations) {
184-
for (execId <- loc.executorId) {
185-
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
184+
loc match {
185+
case e: ExecutorCacheTaskLocation =>
186+
addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
187+
case e: HDFSCacheTaskLocation => {
188+
val exe = sched.getExecutorsAliveOnHost(loc.host)
189+
exe match {
190+
case Some(set) => {
191+
for (e <- set) {
192+
addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
193+
}
194+
logInfo(s"Pending task $index has a cached location at ${e.host} " +
195+
", where there are executors " + set.mkString(","))
196+
}
197+
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
198+
", but there are no executors alive there.")
199+
}
200+
}
201+
case _ => Unit
186202
}
187203
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
188204
for (rack <- sched.getRackForHost(loc.host)) {
@@ -283,7 +299,10 @@ private[spark] class TaskSetManager(
283299
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
284300
for (index <- speculatableTasks if canRunOnHost(index)) {
285301
val prefs = tasks(index).preferredLocations
286-
val executors = prefs.flatMap(_.executorId)
302+
val executors = prefs.flatMap(_ match {
303+
case e: ExecutorCacheTaskLocation => Some(e.executorId)
304+
case _ => None
305+
});
287306
if (executors.contains(execId)) {
288307
speculatableTasks -= index
289308
return Some((index, TaskLocality.PROCESS_LOCAL))

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1439,7 +1439,7 @@ private[spark] object Utils extends Logging {
14391439
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
14401440
for (offset <- 0 to maxRetries) {
14411441
// Do not increment port if startPort is 0, which is treated as a special port
1442-
val tryPort = if (startPort == 0) startPort else (startPort + offset) % (65536 - 1024) + 1024
1442+
val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536
14431443
try {
14441444
val (service, port) = startService(tryPort)
14451445
logInfo(s"Successfully started service$serviceString on port $port.")
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import scala.concurrent.Await
21+
import scala.concurrent.duration.Duration
22+
23+
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
24+
25+
import org.apache.spark.SparkContext._
26+
27+
class FutureActionSuite extends FunSuite with BeforeAndAfter with Matchers with LocalSparkContext {
28+
29+
before {
30+
sc = new SparkContext("local", "FutureActionSuite")
31+
}
32+
33+
test("simple async action") {
34+
val rdd = sc.parallelize(1 to 10, 2)
35+
val job = rdd.countAsync()
36+
val res = Await.result(job, Duration.Inf)
37+
res should be (10)
38+
job.jobIds.size should be (1)
39+
}
40+
41+
test("complex async action") {
42+
val rdd = sc.parallelize(1 to 15, 3)
43+
val job = rdd.takeAsync(10)
44+
val res = Await.result(job, Duration.Inf)
45+
res should be (1 to 10)
46+
job.jobIds.size should be (2)
47+
}
48+
49+
}

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,28 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
642642
assert(manager.resourceOffer("execC", "host3", ANY) !== None)
643643
}
644644

645+
test("Test that locations with HDFSCacheTaskLocation are treated as PROCESS_LOCAL.") {
646+
// Regression test for SPARK-2931
647+
sc = new SparkContext("local", "test")
648+
val sched = new FakeTaskScheduler(sc,
649+
("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
650+
val taskSet = FakeTask.createTaskSet(3,
651+
Seq(HostTaskLocation("host1")),
652+
Seq(HostTaskLocation("host2")),
653+
Seq(HDFSCacheTaskLocation("host3")))
654+
val clock = new FakeClock
655+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
656+
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
657+
sched.removeExecutor("execA")
658+
manager.executorAdded()
659+
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
660+
sched.removeExecutor("execB")
661+
manager.executorAdded()
662+
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
663+
sched.removeExecutor("execC")
664+
manager.executorAdded()
665+
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
666+
}
645667

646668
def createTaskResult(id: Int): DirectTaskResult[Int] = {
647669
val valueSer = SparkEnv.get.serializer.newInstance()

examples/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@
4343
<artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
4444
<version>${project.version}</version>
4545
</dependency>
46+
<dependency>
47+
<groupId>org.apache.httpcomponents</groupId>
48+
<artifactId>httpclient</artifactId>
49+
<version>${commons.httpclient.version}</version>
50+
</dependency>
4651
</dependencies>
4752
</profile>
4853
</profiles>

0 commit comments

Comments
 (0)