Skip to content

Commit 86a87bf

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4924
2 parents 2061967 + fd2c032 commit 86a87bf

File tree

101 files changed

+5880
-3582
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+5880
-3582
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,32 @@ package org.apache.spark
1919

2020
/**
2121
* A client that communicates with the cluster manager to request or kill executors.
22+
* This is currently supported only in YARN mode.
2223
*/
2324
private[spark] trait ExecutorAllocationClient {
2425

26+
/**
27+
* Express a preference to the cluster manager for a given total number of executors.
28+
* This can result in canceling pending requests or filing additional requests.
29+
* @return whether the request is acknowledged by the cluster manager.
30+
*/
31+
private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
32+
2533
/**
2634
* Request an additional number of executors from the cluster manager.
27-
* Return whether the request is acknowledged by the cluster manager.
35+
* @return whether the request is acknowledged by the cluster manager.
2836
*/
2937
def requestExecutors(numAdditionalExecutors: Int): Boolean
3038

3139
/**
3240
* Request that the cluster manager kill the specified executors.
33-
* Return whether the request is acknowledged by the cluster manager.
41+
* @return whether the request is acknowledged by the cluster manager.
3442
*/
3543
def killExecutors(executorIds: Seq[String]): Boolean
3644

3745
/**
3846
* Request that the cluster manager kill the specified executor.
39-
* Return whether the request is acknowledged by the cluster manager.
47+
* @return whether the request is acknowledged by the cluster manager.
4048
*/
4149
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
4250
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 104 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -201,18 +201,34 @@ private[spark] class ExecutorAllocationManager(
201201
}
202202

203203
/**
204-
* If the add time has expired, request new executors and refresh the add time.
205-
* If the remove time for an existing executor has expired, kill the executor.
204+
* The number of executors we would have if the cluster manager were to fulfill all our existing
205+
* requests.
206+
*/
207+
private def targetNumExecutors(): Int =
208+
numExecutorsPending + executorIds.size - executorsPendingToRemove.size
209+
210+
/**
211+
* The maximum number of executors we would need under the current load to satisfy all running
212+
* and pending tasks, rounded up.
213+
*/
214+
private def maxNumExecutorsNeeded(): Int = {
215+
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
216+
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
217+
}
218+
219+
/**
220+
* This is called at a fixed interval to regulate the number of pending executor requests
221+
* and number of executors running.
222+
*
223+
* First, adjust our requested executors based on the add time and our current needs.
224+
* Then, if the remove time for an existing executor has expired, kill the executor.
225+
*
206226
* This is factored out into its own method for testing.
207227
*/
208228
private def schedule(): Unit = synchronized {
209229
val now = clock.getTimeMillis
210-
if (addTime != NOT_SET && now >= addTime) {
211-
addExecutors()
212-
logDebug(s"Starting timer to add more executors (to " +
213-
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
214-
addTime += sustainedSchedulerBacklogTimeout * 1000
215-
}
230+
231+
addOrCancelExecutorRequests(now)
216232

217233
removeTimes.retain { case (executorId, expireTime) =>
218234
val expired = now >= expireTime
@@ -223,59 +239,89 @@ private[spark] class ExecutorAllocationManager(
223239
}
224240
}
225241

242+
/**
243+
* Check to see whether our existing allocation and the requests we've made previously exceed our
244+
* current needs. If so, let the cluster manager know so that it can cancel pending requests that
245+
* are unneeded.
246+
*
247+
* If not, and the add time has expired, see if we can request new executors and refresh the add
248+
* time.
249+
*
250+
* @return the delta in the target number of executors.
251+
*/
252+
private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
253+
val currentTarget = targetNumExecutors
254+
val maxNeeded = maxNumExecutorsNeeded
255+
256+
if (maxNeeded < currentTarget) {
257+
// The target number exceeds the number we actually need, so stop adding new
258+
// executors and inform the cluster manager to cancel the extra pending requests.
259+
val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
260+
client.requestTotalExecutors(newTotalExecutors)
261+
numExecutorsToAdd = 1
262+
updateNumExecutorsPending(newTotalExecutors)
263+
} else if (addTime != NOT_SET && now >= addTime) {
264+
val delta = addExecutors(maxNeeded)
265+
logDebug(s"Starting timer to add more executors (to " +
266+
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
267+
addTime += sustainedSchedulerBacklogTimeout * 1000
268+
delta
269+
} else {
270+
0
271+
}
272+
}
273+
226274
/**
227275
* Request a number of executors from the cluster manager.
228276
* If the cap on the number of executors is reached, give up and reset the
229277
* number of executors to add next round instead of continuing to double it.
230-
* Return the number actually requested.
278+
*
279+
* @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
280+
* tasks could fill
281+
* @return the number of additional executors actually requested.
231282
*/
232-
private def addExecutors(): Int = synchronized {
233-
// Do not request more executors if we have already reached the upper bound
234-
val numExistingExecutors = executorIds.size + numExecutorsPending
235-
if (numExistingExecutors >= maxNumExecutors) {
283+
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
284+
// Do not request more executors if it would put our target over the upper bound
285+
val currentTarget = targetNumExecutors
286+
if (currentTarget >= maxNumExecutors) {
236287
logDebug(s"Not adding executors because there are already ${executorIds.size} " +
237288
s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
238289
numExecutorsToAdd = 1
239290
return 0
240291
}
241292

242-
// The number of executors needed to satisfy all pending tasks is the number of tasks pending
243-
// divided by the number of tasks each executor can fit, rounded up.
244-
val maxNumExecutorsPending =
245-
(listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
246-
if (numExecutorsPending >= maxNumExecutorsPending) {
247-
logDebug(s"Not adding executors because there are already $numExecutorsPending " +
248-
s"pending and pending tasks could only fill $maxNumExecutorsPending")
249-
numExecutorsToAdd = 1
250-
return 0
251-
}
252-
253-
// It's never useful to request more executors than could satisfy all the pending tasks, so
254-
// cap request at that amount.
255-
// Also cap request with respect to the configured upper bound.
256-
val maxNumExecutorsToAdd = math.min(
257-
maxNumExecutorsPending - numExecutorsPending,
258-
maxNumExecutors - numExistingExecutors)
259-
assert(maxNumExecutorsToAdd > 0)
260-
261-
val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
262-
263-
val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
264-
val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
293+
val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
294+
val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
295+
val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
265296
if (addRequestAcknowledged) {
266-
logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
267-
s"tasks are backlogged (new desired total will be $newTotalExecutors)")
268-
numExecutorsToAdd =
269-
if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1
270-
numExecutorsPending += actualNumExecutorsToAdd
271-
actualNumExecutorsToAdd
297+
val delta = updateNumExecutorsPending(newTotalExecutors)
298+
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
299+
s" (new desired total will be $newTotalExecutors)")
300+
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
301+
numExecutorsToAdd * 2
302+
} else {
303+
1
304+
}
305+
delta
272306
} else {
273-
logWarning(s"Unable to reach the cluster manager " +
274-
s"to request $actualNumExecutorsToAdd executors!")
307+
logWarning(
308+
s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
275309
0
276310
}
277311
}
278312

313+
/**
314+
* Given the new target number of executors, update the number of pending executor requests,
315+
* and return the delta from the old number of pending requests.
316+
*/
317+
private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
318+
val newNumExecutorsPending =
319+
newTotalExecutors - executorIds.size + executorsPendingToRemove.size
320+
val delta = newNumExecutorsPending - numExecutorsPending
321+
numExecutorsPending = newNumExecutorsPending
322+
delta
323+
}
324+
279325
/**
280326
* Request the cluster manager to remove the given executor.
281327
* Return whether the request is received.
@@ -415,6 +461,8 @@ private[spark] class ExecutorAllocationManager(
415461
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
416462
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
417463
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
464+
// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
465+
private var numRunningTasks: Int = _
418466

419467
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
420468
val stageId = stageSubmitted.stageInfo.stageId
@@ -435,6 +483,10 @@ private[spark] class ExecutorAllocationManager(
435483
// This is needed in case the stage is aborted for any reason
436484
if (stageIdToNumTasks.isEmpty) {
437485
allocationManager.onSchedulerQueueEmpty()
486+
if (numRunningTasks != 0) {
487+
logWarning("No stages are running, but numRunningTasks != 0")
488+
numRunningTasks = 0
489+
}
438490
}
439491
}
440492
}
@@ -446,6 +498,7 @@ private[spark] class ExecutorAllocationManager(
446498
val executorId = taskStart.taskInfo.executorId
447499

448500
allocationManager.synchronized {
501+
numRunningTasks += 1
449502
// This guards against the race condition in which the `SparkListenerTaskStart`
450503
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
451504
// possible because these events are posted in different threads. (see SPARK-4951)
@@ -475,7 +528,8 @@ private[spark] class ExecutorAllocationManager(
475528
val executorId = taskEnd.taskInfo.executorId
476529
val taskId = taskEnd.taskInfo.taskId
477530
allocationManager.synchronized {
478-
// If the executor is no longer running scheduled any tasks, mark it as idle
531+
numRunningTasks -= 1
532+
// If the executor is no longer running any scheduled tasks, mark it as idle
479533
if (executorIdToTaskIds.contains(executorId)) {
480534
executorIdToTaskIds(executorId) -= taskId
481535
if (executorIdToTaskIds(executorId).isEmpty) {
@@ -514,6 +568,11 @@ private[spark] class ExecutorAllocationManager(
514568
}.sum
515569
}
516570

571+
/**
572+
* The number of tasks currently running across all stages.
573+
*/
574+
def totalRunningTasks(): Int = numRunningTasks
575+
517576
/**
518577
* Return true if an executor is not currently running a task, and false otherwise.
519578
*

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.util.concurrent.ConcurrentHashMap
21+
import java.util.concurrent.atomic.AtomicBoolean
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.mutable.LinkedHashSet
@@ -67,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
6768
if (value == null) {
6869
throw new NullPointerException("null value for " + key)
6970
}
70-
settings.put(key, value)
71+
settings.put(translateConfKey(key, warn = true), value)
7172
this
7273
}
7374

@@ -139,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
139140

140141
/** Set a parameter if it isn't already configured */
141142
def setIfMissing(key: String, value: String): SparkConf = {
142-
settings.putIfAbsent(key, value)
143+
settings.putIfAbsent(translateConfKey(key, warn = true), value)
143144
this
144145
}
145146

@@ -175,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
175176

176177
/** Get a parameter as an Option */
177178
def getOption(key: String): Option[String] = {
178-
Option(settings.get(key))
179+
Option(settings.get(translateConfKey(key)))
179180
}
180181

181182
/** Get all parameters as a list of pairs */
@@ -228,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
228229
def getAppId: String = get("spark.app.id")
229230

230231
/** Does the configuration contain a given parameter? */
231-
def contains(key: String): Boolean = settings.containsKey(key)
232+
def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
232233

233234
/** Copy this object */
234235
override def clone: SparkConf = {
@@ -285,7 +286,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
285286
// Validate memory fractions
286287
val memoryKeys = Seq(
287288
"spark.storage.memoryFraction",
288-
"spark.shuffle.memoryFraction",
289+
"spark.shuffle.memoryFraction",
289290
"spark.shuffle.safetyFraction",
290291
"spark.storage.unrollFraction",
291292
"spark.storage.safetyFraction")
@@ -351,9 +352,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
351352
def toDebugString: String = {
352353
getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
353354
}
355+
354356
}
355357

356-
private[spark] object SparkConf {
358+
private[spark] object SparkConf extends Logging {
359+
360+
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
361+
val configs = Seq(
362+
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
363+
"1.3"),
364+
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
365+
"Use spark.{driver,executor}.userClassPathFirst instead."))
366+
configs.map { x => (x.oldName, x) }.toMap
367+
}
368+
357369
/**
358370
* Return whether the given config is an akka config (e.g. akka.actor.provider).
359371
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
@@ -380,4 +392,63 @@ private[spark] object SparkConf {
380392
def isSparkPortConf(name: String): Boolean = {
381393
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
382394
}
395+
396+
/**
397+
* Translate the configuration key if it is deprecated and has a replacement, otherwise just
398+
* returns the provided key.
399+
*
400+
* @param userKey Configuration key from the user / caller.
401+
* @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
402+
* only once for each key.
403+
*/
404+
def translateConfKey(userKey: String, warn: Boolean = false): String = {
405+
deprecatedConfigs.get(userKey)
406+
.map { deprecatedKey =>
407+
if (warn) {
408+
deprecatedKey.warn()
409+
}
410+
deprecatedKey.newName.getOrElse(userKey)
411+
}.getOrElse(userKey)
412+
}
413+
414+
/**
415+
* Holds information about keys that have been deprecated or renamed.
416+
*
417+
* @param oldName Old configuration key.
418+
* @param newName New configuration key, or `null` if key has no replacement, in which case the
419+
* deprecated key will be used (but the warning message will still be printed).
420+
* @param version Version of Spark where key was deprecated.
421+
* @param deprecationMessage Message to include in the deprecation warning; mandatory when
422+
* `newName` is not provided.
423+
*/
424+
private case class DeprecatedConfig(
425+
oldName: String,
426+
_newName: String,
427+
version: String,
428+
deprecationMessage: String = null) {
429+
430+
private val warned = new AtomicBoolean(false)
431+
val newName = Option(_newName)
432+
433+
if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
434+
throw new IllegalArgumentException("Need new config name or deprecation message.")
435+
}
436+
437+
def warn(): Unit = {
438+
if (warned.compareAndSet(false, true)) {
439+
if (newName != null) {
440+
val message = Option(deprecationMessage).getOrElse(
441+
s"Please use the alternative '$newName' instead.")
442+
logWarning(
443+
s"The configuration option '$oldName' has been replaced as of Spark $version and " +
444+
s"may be removed in the future. $message")
445+
} else {
446+
logWarning(
447+
s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
448+
s"may be removed in the future. $deprecationMessage")
449+
}
450+
}
451+
}
452+
453+
}
383454
}

0 commit comments

Comments
 (0)