Skip to content

Commit dcd15e8

Browse files
committed
Use send for removeExecutor
1 parent 142de67 commit dcd15e8

File tree

2 files changed

+14
-26
lines changed

2 files changed

+14
-26
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
143143
// Ignoring the task kill since the executor is not registered.
144144
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
145145
}
146+
147+
case RemoveExecutor(executorId, reason) =>
148+
// We will remove the executor's state and cannot restore it. However, the connection
149+
// between the driver and the executor may be still alive so that the executor won't exit
150+
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
151+
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
152+
removeExecutor(executorId, reason)
146153
}
147154

148155
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -196,14 +203,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
196203
}
197204
context.reply(true)
198205

199-
case RemoveExecutor(executorId, reason) =>
200-
// We will remove the executor's state and cannot restore it. However, the connection
201-
// between the driver and the executor may be still alive so that the executor won't exit
202-
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
203-
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
204-
removeExecutor(executorId, reason)
205-
context.reply(true)
206-
207206
case RetrieveSparkProps =>
208207
context.reply(sparkProperties)
209208
}
@@ -407,20 +406,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
407406
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
408407
}
409408

410-
// Called by subclasses when notified of a lost worker
409+
/**
410+
* Called by subclasses when notified of a lost worker. It just fires the message and returns
411+
* at once.
412+
*/
411413
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
412-
try {
413-
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
414-
} catch {
415-
case e: Exception =>
416-
throw new SparkException("Error notifying standalone scheduler's driver endpoint", e)
417-
}
418-
}
419-
420-
protected def removeExecutorAsync(
421-
executorId: String,
422-
reason: ExecutorLossReason): Future[Boolean] = {
423-
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason))
414+
// Only log the failure since we don't care about the result.
415+
driverEndpoint.send(RemoveExecutor(executorId, reason))
424416
}
425417

426418
def sufficientResourcesRegistered(): Boolean = true

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,17 +148,13 @@ private[spark] class StandaloneSchedulerBackend(
148148
fullId, hostPort, cores, Utils.megabytesToString(memory)))
149149
}
150150

151-
/** Note: this method should not block. See [[StandaloneAppClientListener]] */
152151
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
153152
val reason: ExecutorLossReason = exitStatus match {
154153
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
155154
case None => SlaveLost(message)
156155
}
157156
logInfo("Executor %s removed: %s".format(fullId, message))
158-
// Only log the failure since we don't care about the result.
159-
removeExecutorAsync(fullId.split("/")(1), reason).onFailure { case t =>
160-
logError(t.getMessage, t)
161-
}(ThreadUtils.sameThread)
157+
removeExecutor(fullId.split("/")(1), reason)
162158
}
163159

164160
override def sufficientResourcesRegistered(): Boolean = {

0 commit comments

Comments
 (0)