Skip to content

Commit 8387a3a

Browse files
author
Marcelo Vanzin
committed
[SPARK-8059] [yarn] Wake up allocation thread when new requests arrive.
This should help reduce latency for new executor allocations.
1 parent 5cd6a63 commit 8387a3a

File tree

2 files changed

+19
-4
lines changed

2 files changed

+19
-4
lines changed

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ private[spark] class ApplicationMaster(
6767

6868
@volatile private var reporterThread: Thread = _
6969
@volatile private var allocator: YarnAllocator = _
70+
private val allocatorLock = new Object()
7071

7172
// Fields used in client mode.
7273
private var rpcEnv: RpcEnv = null
@@ -359,7 +360,9 @@ private[spark] class ApplicationMaster(
359360
}
360361
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
361362
s"Sleeping for $sleepInterval.")
362-
Thread.sleep(sleepInterval)
363+
allocatorLock.synchronized {
364+
allocatorLock.wait(sleepInterval)
365+
}
363366
} catch {
364367
case e: InterruptedException =>
365368
}
@@ -546,8 +549,15 @@ private[spark] class ApplicationMaster(
546549
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
547550
case RequestExecutors(requestedTotal) =>
548551
Option(allocator) match {
549-
case Some(a) => a.requestTotalExecutors(requestedTotal)
550-
case None => logWarning("Container allocator is not ready to request executors yet.")
552+
case Some(a) =>
553+
allocatorLock.synchronized {
554+
if (a.requestTotalExecutors(requestedTotal)) {
555+
allocatorLock.notifyAll()
556+
}
557+
}
558+
559+
case None =>
560+
logWarning("Container allocator is not ready to request executors yet.")
551561
}
552562
context.reply(true)
553563

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,16 @@ private[yarn] class YarnAllocator(
146146
* Request as many executors from the ResourceManager as needed to reach the desired total. If
147147
* the requested total is smaller than the current number of running executors, no executors will
148148
* be killed.
149+
*
150+
* @return Whether the new requested total is different than the old value.
149151
*/
150-
def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
152+
def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
151153
if (requestedTotal != targetNumExecutors) {
152154
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
153155
targetNumExecutors = requestedTotal
156+
true
157+
} else {
158+
false
154159
}
155160
}
156161

0 commit comments

Comments
 (0)