Skip to content

[SPARK-10918] [CORE] Prevent task failed for executor kill by driver #8975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ private[spark] class ExecutorAllocationManager(

// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
with mutable.SynchronizedSet[String]

// All known executors
private val executorIds = new mutable.HashSet[String]
private val executorIds = new mutable.HashSet[String] with mutable.SynchronizedSet[String]

// A timestamp of when an addition should be triggered, or NOT_SET if it is not set
// This is set when pending tasks are added but not scheduled yet
Expand All @@ -137,6 +138,7 @@ private[spark] class ExecutorAllocationManager(
// A timestamp for each executor of when the executor should be removed, indexed by the ID
// This is set when an executor is no longer running a task, or when it first registers
private val removeTimes = new mutable.HashMap[String, Long]
with mutable.SynchronizedMap[String, Long]

// Polling loop interval (ms)
private val intervalMillis: Long = 100
Expand Down Expand Up @@ -502,11 +504,15 @@ private[spark] class ExecutorAllocationManager(
}
}

def isExecutorPendingToRemove(executorId: String): Boolean = {
!executorsPendingToRemove.contains(executorId)
}

/**
* Callback invoked when the specified executor is now running a task.
* This resets all variables used for removing this executor.
*/
private def onExecutorBusy(executorId: String): Unit = synchronized {
def onExecutorBusy(executorId: String): Unit = synchronized {
logDebug(s"Clearing idle timer for $executorId because it is now running a task")
removeTimes.remove(executorId)
}
Expand Down Expand Up @@ -605,7 +611,6 @@ private[spark] class ExecutorAllocationManager(

// Mark the executor on which this task is scheduled as busy
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
allocationManager.onExecutorBusy(executorId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,16 @@ private[spark] class TaskSetManager(
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()

val executorManager = sched.sc.executorAllocationManager.getOrElse(null)
if (executorManager != null) {
if (!executorManager.isExecutorPendingToRemove(execId)) {
logWarning(s"Executor $execId is removed before scheduler task.")
return None
}
executorManager.onExecutorBusy(execId)
}

// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
Expand Down