Skip to content

Commit 665701d

Browse files
committed
Exit when reach max number failed executors
1 parent 9306b8c commit 665701d

File tree

4 files changed

+40
-19
lines changed

4 files changed

+40
-19
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
267267
// TODO: This is a bit ugly. Can we make it nicer?
268268
// TODO: Handle container failure
269269

270-
// Exists the loop if the user thread exits.
271-
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
272-
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
273-
finishApplicationMaster(FinalApplicationStatus.FAILED,
274-
"max number of executor failures reached")
275-
}
270+
// Exits the loop if the user thread exits.
271+
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
272+
&& !isFinished) {
273+
checkNumExecutorsFailed()
276274
yarnAllocator.allocateContainers(
277275
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
278276
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
@@ -303,11 +301,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
303301

304302
val t = new Thread {
305303
override def run() {
306-
while (userThread.isAlive) {
307-
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
308-
finishApplicationMaster(FinalApplicationStatus.FAILED,
309-
"max number of executor failures reached")
310-
}
304+
while (userThread.isAlive && !isFinished) {
305+
checkNumExecutorsFailed()
311306
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
312307
if (missingExecutorCount > 0) {
313308
logInfo("Allocating %d containers to make up for (potentially) lost containers".
@@ -327,6 +322,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
327322
t
328323
}
329324

325+
private def checkNumExecutorsFailed() {
326+
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
327+
logInfo("max number of executor failures reached")
328+
finishApplicationMaster(FinalApplicationStatus.FAILED,
329+
"max number of executor failures reached")
330+
// make sure to stop the user thread
331+
val sparkContext = ApplicationMaster.sparkContextRef.get()
332+
if (sparkContext != null) {
333+
logInfo("Invoking sc stop from checkNumExecutorsFailed")
334+
sparkContext.stop()
335+
} else {
336+
logError("sparkContext is null when should shutdown")
337+
}
338+
}
339+
}
340+
330341
private def sendProgress() {
331342
logDebug("Sending progress")
332343
// Simulated with an allocate request with no nodes requested ...

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
249249
// Wait until all containers have finished
250250
// TODO: This is a bit ugly. Can we make it nicer?
251251
// TODO: Handle container failure
252-
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
252+
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
253+
!isFinished) {
253254
yarnAllocator.allocateContainers(
254255
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
255256
checkNumExecutorsFailed()
@@ -271,7 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
271272

272273
val t = new Thread {
273274
override def run() {
274-
while (!driverClosed) {
275+
while (!driverClosed && !isFinished) {
275276
checkNumExecutorsFailed()
276277
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
277278
if (missingExecutorCount > 0) {

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,13 +247,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
247247
yarnAllocator.allocateResources()
248248
// Exits the loop if the user thread exits.
249249

250-
var iters = 0
251-
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
250+
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
251+
&& !isFinished) {
252252
checkNumExecutorsFailed()
253253
allocateMissingExecutor()
254254
yarnAllocator.allocateResources()
255255
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
256-
iters += 1
257256
}
258257
}
259258
logInfo("All executors have launched.")
@@ -271,8 +270,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
271270

272271
private def checkNumExecutorsFailed() {
273272
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
273+
logInfo("max number of executor failures reached")
274274
finishApplicationMaster(FinalApplicationStatus.FAILED,
275275
"max number of executor failures reached")
276+
// make sure to stop the user thread
277+
val sparkContext = ApplicationMaster.sparkContextRef.get()
278+
if (sparkContext != null) {
279+
logInfo("Invoking sc stop from checkNumExecutorsFailed")
280+
sparkContext.stop()
281+
} else {
282+
logError("sparkContext is null when should shutdown")
283+
}
276284
}
277285
}
278286

@@ -289,7 +297,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
289297

290298
val t = new Thread {
291299
override def run() {
292-
while (userThread.isAlive) {
300+
while (userThread.isAlive && !isFinished) {
293301
checkNumExecutorsFailed()
294302
allocateMissingExecutor()
295303
logDebug("Sending progress")

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
217217
// Wait until all containers have launched
218218
yarnAllocator.addResourceRequests(args.numExecutors)
219219
yarnAllocator.allocateResources()
220-
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
220+
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
221+
!isFinished) {
221222
checkNumExecutorsFailed()
222223
allocateMissingExecutor()
223224
yarnAllocator.allocateResources()
@@ -249,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
249250

250251
val t = new Thread {
251252
override def run() {
252-
while (!driverClosed) {
253+
while (!driverClosed && !isFinished) {
253254
checkNumExecutorsFailed()
254255
allocateMissingExecutor()
255256
logDebug("Sending progress")

0 commit comments

Comments
 (0)