Skip to content

Commit 9434b50

Browse files
author
Ilya Ganelin
committed
Merge remote-tracking branch 'upstream/master' into SPARK-5845
2 parents db8647e + 7c7d2d5 commit 9434b50

File tree

309 files changed

+9028
-3679
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

309 files changed

+9028
-3679
lines changed

LICENSE

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
771771
See the License for the specific language governing permissions and
772772
limitations under the License.
773773

774+
========================================================================
775+
For TestTimSort (core/src/test/java/org/apache/spark/util/collection/TestTimSort.java):
776+
========================================================================
777+
Copyright (C) 2015 Stijn de Gouw
778+
779+
Licensed under the Apache License, Version 2.0 (the "License");
780+
you may not use this file except in compliance with the License.
781+
You may obtain a copy of the License at
782+
783+
http://www.apache.org/licenses/LICENSE-2.0
784+
785+
Unless required by applicable law or agreed to in writing, software
786+
distributed under the License is distributed on an "AS IS" BASIS,
787+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
788+
See the License for the specific language governing permissions and
789+
limitations under the License.
774790

775791
========================================================================
776792
For LimitedInputStream

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>

conf/metrics.properties.template

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,15 @@
122122

123123
#worker.sink.csv.unit=minutes
124124

125+
# Enable Slf4jSink for all instances by class name
126+
#*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink
127+
128+
# Polling period for Slf4JSink
129+
#*.sink.sl4j.period=1
130+
131+
#*.sink.sl4j.unit=minutes
132+
133+
125134
# Enable jvm source for instance master, worker, driver and executor
126135
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
127136

core/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
@@ -319,6 +319,12 @@
319319
<artifactId>selenium-java</artifactId>
320320
<scope>test</scope>
321321
</dependency>
322+
<!-- Added for selenium: -->
323+
<dependency>
324+
<groupId>xml-apis</groupId>
325+
<artifactId>xml-apis</artifactId>
326+
<scope>test</scope>
327+
</dependency>
322328
<dependency>
323329
<groupId>org.mockito</groupId>
324330
<artifactId>mockito-all</artifactId>

core/src/main/java/org/apache/spark/util/collection/TimSort.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,15 +425,14 @@ private void pushRun(int runBase, int runLen) {
425425
private void mergeCollapse() {
426426
while (stackSize > 1) {
427427
int n = stackSize - 2;
428-
if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
428+
if ( (n >= 1 && runLen[n-1] <= runLen[n] + runLen[n+1])
429+
|| (n >= 2 && runLen[n-2] <= runLen[n] + runLen[n-1])) {
429430
if (runLen[n - 1] < runLen[n + 1])
430431
n--;
431-
mergeAt(n);
432-
} else if (runLen[n] <= runLen[n + 1]) {
433-
mergeAt(n);
434-
} else {
432+
} else if (runLen[n] > runLen[n + 1]) {
435433
break; // Invariant is established
436434
}
435+
mergeAt(n);
437436
}
438437
}
439438

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.lang.ThreadLocal
2323

2424
import scala.collection.generic.Growable
2525
import scala.collection.mutable.Map
26+
import scala.ref.WeakReference
2627
import scala.reflect.ClassTag
2728

2829
import org.apache.spark.serializer.JavaSerializer
@@ -279,13 +280,24 @@ object AccumulatorParam {
279280

280281
// TODO: The multi-thread support in accumulators is kind of lame; check
281282
// if there's a more intuitive way of doing it right
282-
private[spark] object Accumulators {
283-
// TODO: Use soft references? => need to make readObject work properly then
284-
val originals = Map[Long, Accumulable[_, _]]()
285-
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
283+
private[spark] object Accumulators extends Logging {
284+
/**
285+
* This global map holds the original accumulator objects that are created on the driver.
286+
* It keeps weak references to these objects so that accumulators can be garbage-collected
287+
* once the RDDs and user-code that reference them are cleaned up.
288+
*/
289+
val originals = Map[Long, WeakReference[Accumulable[_, _]]]()
290+
291+
/**
292+
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
293+
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
294+
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
295+
*/
296+
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
286297
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
287298
}
288-
var lastId: Long = 0
299+
300+
private var lastId: Long = 0
289301

290302
def newId(): Long = synchronized {
291303
lastId += 1
@@ -294,7 +306,7 @@ private[spark] object Accumulators {
294306

295307
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
296308
if (original) {
297-
originals(a.id) = a
309+
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
298310
} else {
299311
localAccums.get()(a.id) = a
300312
}
@@ -303,7 +315,13 @@ private[spark] object Accumulators {
303315
// Clear the local (non-original) accumulators for the current thread
304316
def clear() {
305317
synchronized {
306-
localAccums.get.clear
318+
localAccums.get.clear()
319+
}
320+
}
321+
322+
def remove(accId: Long) {
323+
synchronized {
324+
originals.remove(accId)
307325
}
308326
}
309327

@@ -320,7 +338,15 @@ private[spark] object Accumulators {
320338
def add(values: Map[Long, Any]): Unit = synchronized {
321339
for ((id, value) <- values) {
322340
if (originals.contains(id)) {
323-
originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value
341+
// Since we are now storing weak references, we must check whether the underlying data
342+
// is valid.
343+
originals(id).get match {
344+
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
345+
case None =>
346+
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
347+
}
348+
} else {
349+
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
324350
}
325351
}
326352
}

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ private sealed trait CleanupTask
3232
private case class CleanRDD(rddId: Int) extends CleanupTask
3333
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
3434
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
35+
private case class CleanAccum(accId: Long) extends CleanupTask
3536

3637
/**
3738
* A WeakReference associated with a CleanupTask.
@@ -104,16 +105,30 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
104105
cleaningThread.start()
105106
}
106107

107-
/** Stop the cleaner. */
108+
/**
109+
* Stop the cleaning thread and wait until the thread has finished running its current task.
110+
*/
108111
def stop() {
109112
stopped = true
113+
// Interrupt the cleaning thread, but wait until the current task has finished before
114+
// doing so. This guards against the race condition where a cleaning thread may
115+
// potentially clean similarly named variables created by a different SparkContext,
116+
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
117+
synchronized {
118+
cleaningThread.interrupt()
119+
}
120+
cleaningThread.join()
110121
}
111122

112123
/** Register a RDD for cleanup when it is garbage collected. */
113124
def registerRDDForCleanup(rdd: RDD[_]) {
114125
registerForCleanup(rdd, CleanRDD(rdd.id))
115126
}
116127

128+
def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
129+
registerForCleanup(a, CleanAccum(a.id))
130+
}
131+
117132
/** Register a ShuffleDependency for cleanup when it is garbage collected. */
118133
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]) {
119134
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
@@ -135,19 +150,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
135150
try {
136151
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
137152
.map(_.asInstanceOf[CleanupTaskWeakReference])
138-
reference.map(_.task).foreach { task =>
139-
logDebug("Got cleaning task " + task)
140-
referenceBuffer -= reference.get
141-
task match {
142-
case CleanRDD(rddId) =>
143-
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
144-
case CleanShuffle(shuffleId) =>
145-
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
146-
case CleanBroadcast(broadcastId) =>
147-
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
153+
// Synchronize here to avoid being interrupted on stop()
154+
synchronized {
155+
reference.map(_.task).foreach { task =>
156+
logDebug("Got cleaning task " + task)
157+
referenceBuffer -= reference.get
158+
task match {
159+
case CleanRDD(rddId) =>
160+
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
161+
case CleanShuffle(shuffleId) =>
162+
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
163+
case CleanBroadcast(broadcastId) =>
164+
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
165+
case CleanAccum(accId) =>
166+
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
167+
}
148168
}
149169
}
150170
} catch {
171+
case ie: InterruptedException if stopped => // ignore
151172
case e: Exception => logError("Error in cleaning thread", e)
152173
}
153174
}
@@ -181,15 +202,27 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
181202
/** Perform broadcast cleanup. */
182203
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
183204
try {
184-
logDebug("Cleaning broadcast " + broadcastId)
205+
logDebug(s"Cleaning broadcast $broadcastId")
185206
broadcastManager.unbroadcast(broadcastId, true, blocking)
186207
listeners.foreach(_.broadcastCleaned(broadcastId))
187-
logInfo("Cleaned broadcast " + broadcastId)
208+
logDebug(s"Cleaned broadcast $broadcastId")
188209
} catch {
189210
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
190211
}
191212
}
192213

214+
/** Perform accumulator cleanup. */
215+
def doCleanupAccum(accId: Long, blocking: Boolean) {
216+
try {
217+
logDebug("Cleaning accumulator " + accId)
218+
Accumulators.remove(accId)
219+
listeners.foreach(_.accumCleaned(accId))
220+
logInfo("Cleaned accumulator " + accId)
221+
} catch {
222+
case e: Exception => logError("Error cleaning accumulator " + accId, e)
223+
}
224+
}
225+
193226
private def blockManagerMaster = sc.env.blockManager.master
194227
private def broadcastManager = sc.env.broadcastManager
195228
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
@@ -206,4 +239,5 @@ private[spark] trait CleanerListener {
206239
def rddCleaned(rddId: Int)
207240
def shuffleCleaned(shuffleId: Int)
208241
def broadcastCleaned(broadcastId: Long)
242+
def accumCleaned(accId: Long)
209243
}

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,86 @@
1717

1818
package org.apache.spark
1919

20-
import akka.actor.Actor
20+
import scala.concurrent.duration._
21+
import scala.collection.mutable
22+
23+
import akka.actor.{Actor, Cancellable}
24+
2125
import org.apache.spark.executor.TaskMetrics
2226
import org.apache.spark.storage.BlockManagerId
23-
import org.apache.spark.scheduler.TaskScheduler
27+
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
2428
import org.apache.spark.util.ActorLogReceive
2529

2630
/**
2731
* A heartbeat from executors to the driver. This is a shared message used by several internal
28-
* components to convey liveness or execution information for in-progress tasks.
32+
* components to convey liveness or execution information for in-progress tasks. It will also
33+
* expire the hosts that have not heartbeated for more than spark.network.timeout.
2934
*/
3035
private[spark] case class Heartbeat(
3136
executorId: String,
3237
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
3338
blockManagerId: BlockManagerId)
3439

40+
private[spark] case object ExpireDeadHosts
41+
3542
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
3643

3744
/**
3845
* Lives in the driver to receive heartbeats from executors..
3946
*/
40-
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
47+
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
4148
extends Actor with ActorLogReceive with Logging {
4249

50+
// executor ID -> timestamp of when the last heartbeat from this executor was received
51+
private val executorLastSeen = new mutable.HashMap[String, Long]
52+
53+
private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
54+
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000
55+
56+
private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
57+
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000
58+
59+
private var timeoutCheckingTask: Cancellable = null
60+
61+
override def preStart(): Unit = {
62+
import context.dispatcher
63+
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
64+
checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
65+
super.preStart()
66+
}
67+
4368
override def receiveWithLogging = {
4469
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
45-
val response = HeartbeatResponse(
46-
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
70+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
71+
executorId, taskMetrics, blockManagerId)
72+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
73+
executorLastSeen(executorId) = System.currentTimeMillis()
4774
sender ! response
75+
case ExpireDeadHosts =>
76+
expireDeadHosts()
77+
}
78+
79+
private def expireDeadHosts(): Unit = {
80+
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
81+
val now = System.currentTimeMillis()
82+
for ((executorId, lastSeenMs) <- executorLastSeen) {
83+
if (now - lastSeenMs > executorTimeoutMs) {
84+
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
85+
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
86+
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
87+
"timed out after ${now - lastSeenMs} ms"))
88+
if (sc.supportDynamicAllocation) {
89+
sc.killExecutor(executorId)
90+
}
91+
executorLastSeen.remove(executorId)
92+
}
93+
}
94+
}
95+
96+
override def postStop(): Unit = {
97+
if (timeoutCheckingTask != null) {
98+
timeoutCheckingTask.cancel()
99+
}
100+
super.postStop()
48101
}
49102
}

0 commit comments

Comments
 (0)