Skip to content

[SPARK-11801][CORE] Notify driver when OOM is thrown before executor … #9866

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,15 @@ case object TaskKilled extends TaskFailedReason {
override def toErrorString: String = "TaskKilled (killed intentionally)"
}

/**
* :: DeveloperApi ::
* Task caught OOM exception and needs to be rescheduled.
*/
@DeveloperApi
case object TaskOutOfMemory extends TaskFailedReason {
override def toErrorString: String = "TaskOutOfMemory (task caught OutOfMemoryError)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem like you really need this? can't you just use ExceptionFailure?

}

/**
* :: DeveloperApi ::
* Task requested the driver to commit, but was denied.
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaConversions is discouraged (its hard for the reader to see where the conversion is happening). stick to JavaConverters and explicitly call asScala / asJava

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

Expand Down Expand Up @@ -113,8 +114,12 @@ private[spark] class Executor(
// Executor for the heartbeat task.
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")

private val shutdownHookLock = new Object()

startDriverHeartbeater()

addShutdownHook()

def launchTask(
context: ExecutorBackend,
taskId: Long,
Expand Down Expand Up @@ -145,6 +150,24 @@ private[spark] class Executor(
}
}

private def addShutdownHook(): AnyRef = {
ShutdownHookManager.addShutdownHook(ShutdownHookManager.EXECUTOR_SHUTDOWN_PRIORITY) { () =>
// Synchronized with the OOM task handler, which is sending its status update to driver.
// Please note that the assumption here is that OOM thread is still running and will gets
// the lock prior to this.
shutdownHookLock.synchronized {
// Cleanup all the tasks which are currently running, so that they would not through
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: through -> throw

// undesirable error messages during shutdown. Please note that kill interrupts all
// the running threads and tasks will be killed when interrupts are actually handled.
runningTasks.values.foreach(t => t.kill(true))
}
logInfo("shutdown hook called")

// Sleep to make sure that status update from OOM handle is flushed to driver.
Thread.sleep(conf.getInt("spark.executor.shutdownHookDelay", 300))
}
}

/** Returns the total amount of time this JVM process has spent in garbage collection. */
private def computeTotalGcTime(): Long = {
ManagementFactory.getGarbageCollectorMXBeans.asScala.map(_.getCollectionTime).sum
Expand Down Expand Up @@ -291,6 +314,12 @@ private[spark] class Executor(
val reason = cDE.toTaskEndReason
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

case oom: OutOfMemoryError =>
shutdownHookLock.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we know for sure that this thread will always win the race. If it loses, you dont' want this thread to wait. I think you can get around this, though, by using Lock.tryLock here (and continuing whether or not you get the lock), and using Lock.lock from the other thread.

logError("Out of memory exception in " + Thread.currentThread(), oom)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskOutOfMemory))
}

case t: Throwable =>
// Attempt to exit cleanly by informing the driver of our failure.
// If anything goes wrong (or this was a fatal exception), we will delegate to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,11 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}

private def addShutdownHook(): AnyRef = {
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
logInfo("Shutdown hook called")
DiskBlockManager.this.doStop()
}
ShutdownHookManager.addShutdownHook(
ShutdownHookManager.DISK_BLOCK_MANAGER_SHUTDOWN_PRIORITY) { () =>
logInfo("Shutdown hook called")
DiskBlockManager.this.doStop()
}
}

/** Cleanup local dirs and stop shuffle sender. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ private[spark] object ShutdownHookManager extends Logging {
*/
val TEMP_DIR_SHUTDOWN_PRIORITY = 25

/**
* The shutdown priority of disk block manager should be higher than temp directory.
*/
val DISK_BLOCK_MANAGER_SHUTDOWN_PRIORITY = 26

/**
* The shutdown priority of Executor to do the cleanup of the current running tasks. Its
* priority should be higher than DiskBlockManager shutdown priority as the tasks may throw
* unhandled exceptions if temp directory cleanup is happening in parallel.
*/
val EXECUTOR_SHUTDOWN_PRIORITY = 27

private lazy val shutdownHooks = {
val manager = new SparkShutdownHookManager()
manager.install()
Expand Down