Skip to content

Commit 7dd6298

Browse files
committed
Simplify ClientBase#monitorApplication
There's only so much abstraction that you could do before it gets too complicated.
1 parent 547487c commit 7dd6298

File tree

2 files changed

+16
-15
lines changed

2 files changed

+16
-15
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -412,17 +412,15 @@ private[spark] trait ClientBase extends Logging {
412412
*
413413
* @param returnOnRunning Whether to also return the application state when it is RUNNING.
414414
* @param logApplicationReport Whether to log details of the application report every iteration.
415-
* @param shouldKeepMonitoring The condition to keep monitoring.
416415
* @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING.
417416
*/
418417
def monitorApplication(
419418
appId: ApplicationId,
420419
returnOnRunning: Boolean = false,
421-
logApplicationReport: Boolean = true,
422-
shouldKeepMonitoring: () => Boolean = () => true): YarnApplicationState = {
420+
logApplicationReport: Boolean = true): YarnApplicationState = {
423421
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
424422
var firstIteration = true
425-
while (shouldKeepMonitoring()) {
423+
while (true) {
426424
Thread.sleep(interval)
427425
val report = getApplicationReport(appId)
428426
val state = report.getYarnApplicationState

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -117,25 +117,28 @@ private[spark] class YarnClientSchedulerBackend(
117117
* If the application has exited for any reason, stop the SparkContext.
118118
* This assumes both `client` and `appId` have already been set.
119119
*/
120-
private def asyncMonitorApplication(): Thread = {
120+
private def asyncMonitorApplication(): Unit = {
121121
assert(client != null && appId != null, "Application has not been submitted yet!")
122122
val t = new Thread {
123123
override def run() {
124-
val state = client.monitorApplication(
125-
appId, logApplicationReport = false, shouldKeepMonitoring = isStopping) // blocking
126-
if (state == YarnApplicationState.FINISHED ||
127-
state == YarnApplicationState.KILLED ||
128-
state == YarnApplicationState.FAILED) {
129-
logWarning(s"Yarn application has exited: $state")
130-
sc.stop()
131-
stopping = true
124+
while (!stopping) {
125+
val report = client.getApplicationReport(appId)
126+
val state = report.getYarnApplicationState()
127+
if (state == YarnApplicationState.FINISHED ||
128+
state == YarnApplicationState.KILLED ||
129+
state == YarnApplicationState.FAILED) {
130+
logError(s"Yarn application has already exited with state $state!")
131+
sc.stop()
132+
stopping = true
133+
}
134+
Thread.sleep(1000L)
132135
}
136+
Thread.currentThread().interrupt()
133137
}
134138
}
135-
t.setName("Yarn Application State Monitor")
139+
t.setName("Yarn application state monitor")
136140
t.setDaemon(true)
137141
t.start()
138-
t
139142
}
140143

141144
/**

0 commit comments

Comments
 (0)