@@ -27,13 +27,20 @@ import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
27
27
/**
28
28
* An agent that dynamically allocates and removes executors based on the workload.
29
29
*
30
- * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
31
- * the scheduler queue is not drained in N seconds, then new executors are added. If the queue
32
- * persists for another M seconds, then more executors are added and so on. The number added
33
- * in each round increases exponentially from the previous round until an upper bound on the
34
- * number of executors has been reached. The upper bound is based both on a configured property
35
- * and on the number of tasks pending: the policy will never increase the number of executor
36
- * requests past the number needed to handle all pending tasks.
30
+ * The ExecutorAllocationManager maintains a moving target number of executors which is periodically
31
+ * synced to the cluster manager. The target starts at a configured initial value and changes with
32
+ * the number of pending and running tasks.
33
+ *
34
+ * Decreasing the target number of executors happens when the current target is more than needed to
35
+ * handle the current load. The target number of executors is always truncated to the number of
36
+ * executors that could run all current running and pending tasks at once.
37
+ *
38
+ * Increasing the target number of executors happens in response to backlogged tasks waiting to be
39
+ * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
40
+ * the queue persists for another M seconds, then more executors are added and so on. The number
41
+ * added in each round increases exponentially from the previous round until an upper bound has been
42
+ * reached. The upper bound is based both on a configured property and on the current number of
43
+ * running and pending tasks, as described above.
37
44
*
38
45
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
39
46
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager(
105
112
// Number of executors to add in the next round
106
113
private var numExecutorsToAdd = 1
107
114
108
- // Number of executors that have been requested but have not registered yet
109
- private var numExecutorsPending = 0
115
+ // The desired number of executors at this moment in time. If all our executors were to die, this
116
+ // is the number of executors we would immediately want from the cluster manager.
117
+ private var numExecutorsTarget =
118
+ conf.getInt(" spark.dynamicAllocation.initialExecutors" , minNumExecutors)
110
119
111
120
// Executors that have been requested to be removed but have not been killed yet
112
121
private val executorsPendingToRemove = new mutable.HashSet [String ]
@@ -199,13 +208,6 @@ private[spark] class ExecutorAllocationManager(
199
208
executor.awaitTermination(10 , TimeUnit .SECONDS )
200
209
}
201
210
202
- /**
203
- * The number of executors we would have if the cluster manager were to fulfill all our existing
204
- * requests.
205
- */
206
- private def targetNumExecutors (): Int =
207
- numExecutorsPending + executorIds.size - executorsPendingToRemove.size
208
-
209
211
/**
210
212
* The maximum number of executors we would need under the current load to satisfy all running
211
213
* and pending tasks, rounded up.
@@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager(
227
229
private def schedule (): Unit = synchronized {
228
230
val now = clock.getTimeMillis
229
231
230
- addOrCancelExecutorRequests (now)
232
+ updateAndSyncNumExecutorsTarget (now)
231
233
232
234
removeTimes.retain { case (executorId, expireTime) =>
233
235
val expired = now >= expireTime
@@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager(
239
241
}
240
242
241
243
/**
244
+ * Updates our target number of executors and syncs the result with the cluster manager.
245
+ *
242
246
* Check to see whether our existing allocation and the requests we've made previously exceed our
243
- * current needs. If so, let the cluster manager know so that it can cancel pending requests that
244
- * are unneeded.
247
+ * current needs. If so, truncate our target and let the cluster manager know so that it can
248
+ * cancel pending requests that are unneeded.
245
249
*
246
250
* If not, and the add time has expired, see if we can request new executors and refresh the add
247
251
* time.
248
252
*
249
253
* @return the delta in the target number of executors.
250
254
*/
251
- private def addOrCancelExecutorRequests (now : Long ): Int = synchronized {
252
- val currentTarget = targetNumExecutors
255
+ private def updateAndSyncNumExecutorsTarget (now : Long ): Int = synchronized {
253
256
val maxNeeded = maxNumExecutorsNeeded
254
257
255
- if (maxNeeded < currentTarget ) {
258
+ if (maxNeeded < numExecutorsTarget ) {
256
259
// The target number exceeds the number we actually need, so stop adding new
257
- // executors and inform the cluster manager to cancel the extra pending requests.
258
- val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
259
- client.requestTotalExecutors(newTotalExecutors)
260
+ // executors and inform the cluster manager to cancel the extra pending requests
261
+ val oldNumExecutorsTarget = numExecutorsTarget
262
+ numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
263
+ client.requestTotalExecutors(numExecutorsTarget)
260
264
numExecutorsToAdd = 1
261
- updateNumExecutorsPending(newTotalExecutors)
265
+ numExecutorsTarget - oldNumExecutorsTarget
262
266
} else if (addTime != NOT_SET && now >= addTime) {
263
267
val delta = addExecutors(maxNeeded)
264
268
logDebug(s " Starting timer to add more executors (to " +
@@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager(
281
285
*/
282
286
private def addExecutors (maxNumExecutorsNeeded : Int ): Int = {
283
287
// Do not request more executors if it would put our target over the upper bound
284
- val currentTarget = targetNumExecutors
285
- if (currentTarget >= maxNumExecutors) {
286
- logDebug(s " Not adding executors because there are already ${executorIds.size} " +
287
- s " registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors) " )
288
+ if (numExecutorsTarget >= maxNumExecutors) {
289
+ val numExecutorsPending = numExecutorsTarget - executorIds.size
290
+ logDebug(s " Not adding executors because there are already ${executorIds.size} registered " +
291
+ s " and ${ numExecutorsPending} pending executor(s) (limit $maxNumExecutors) " )
288
292
numExecutorsToAdd = 1
289
293
return 0
290
294
}
291
295
292
- val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
293
- val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
294
- val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
296
+ val oldNumExecutorsTarget = numExecutorsTarget
297
+ // There's no point in wasting time ramping up to the number of executors we already have, so
298
+ // make sure our target is at least as much as our current allocation:
299
+ numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
300
+ // Boost our target with the number to add for this round:
301
+ numExecutorsTarget += numExecutorsToAdd
302
+ // Ensure that our target doesn't exceed what we need at the present moment:
303
+ numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
304
+ // Ensure that our target fits within configured bounds:
305
+ numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
306
+
307
+ val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
295
308
if (addRequestAcknowledged) {
296
- val delta = updateNumExecutorsPending(newTotalExecutors)
309
+ val delta = numExecutorsTarget - oldNumExecutorsTarget
297
310
logInfo(s " Requesting $delta new executor(s) because tasks are backlogged " +
298
- s " (new desired total will be $newTotalExecutors ) " )
311
+ s " (new desired total will be $numExecutorsTarget ) " )
299
312
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
300
313
numExecutorsToAdd * 2
301
314
} else {
@@ -304,23 +317,11 @@ private[spark] class ExecutorAllocationManager(
304
317
delta
305
318
} else {
306
319
logWarning(
307
- s " Unable to reach the cluster manager to request $newTotalExecutors total executors! " )
320
+ s " Unable to reach the cluster manager to request $numExecutorsTarget total executors! " )
308
321
0
309
322
}
310
323
}
311
324
312
- /**
313
- * Given the new target number of executors, update the number of pending executor requests,
314
- * and return the delta from the old number of pending requests.
315
- */
316
- private def updateNumExecutorsPending (newTotalExecutors : Int ): Int = {
317
- val newNumExecutorsPending =
318
- newTotalExecutors - executorIds.size + executorsPendingToRemove.size
319
- val delta = newNumExecutorsPending - numExecutorsPending
320
- numExecutorsPending = newNumExecutorsPending
321
- delta
322
- }
323
-
324
325
/**
325
326
* Request the cluster manager to remove the given executor.
326
327
* Return whether the request is received.
@@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager(
372
373
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
373
374
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
374
375
logInfo(s " New executor $executorId has registered (new total is ${executorIds.size}) " )
375
- if (numExecutorsPending > 0 ) {
376
- numExecutorsPending -= 1
377
- logDebug(s " Decremented number of pending executors ( $numExecutorsPending left) " )
378
- }
379
376
} else {
380
377
logWarning(s " Duplicate executor $executorId has registered " )
381
378
}
0 commit comments