-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-11801][CORE] Notify driver when OOM is thrown before executor … #9866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer | |
import java.util.concurrent.{ConcurrentHashMap, TimeUnit} | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.JavaConversions._ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
import scala.collection.mutable.{ArrayBuffer, HashMap} | ||
import scala.util.control.NonFatal | ||
|
||
|
@@ -113,8 +114,12 @@ private[spark] class Executor( | |
// Executor for the heartbeat task. | ||
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") | ||
|
||
private val shutdownHookLock = new Object() | ||
|
||
startDriverHeartbeater() | ||
|
||
addShutdownHook() | ||
|
||
def launchTask( | ||
context: ExecutorBackend, | ||
taskId: Long, | ||
|
@@ -145,6 +150,24 @@ private[spark] class Executor( | |
} | ||
} | ||
|
||
private def addShutdownHook(): AnyRef = { | ||
ShutdownHookManager.addShutdownHook(ShutdownHookManager.EXECUTOR_SHUTDOWN_PRIORITY) { () => | ||
// Synchronized with the OOM task handler, which is sending its status update to driver. | ||
// Please note that the assumption here is that OOM thread is still running and will gets | ||
// the lock prior to this. | ||
shutdownHookLock.synchronized { | ||
// Cleanup all the tasks which are currently running, so that they would not through | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo: through -> throw |
||
// undesirable error messages during shutdown. Please note that kill interrupts all | ||
// the running threads and tasks will be killed when interrupts are actually handled. | ||
runningTasks.values.foreach(t => t.kill(true)) | ||
} | ||
logInfo("shutdown hook called") | ||
|
||
// Sleep to make sure that status update from OOM handle is flushed to driver. | ||
Thread.sleep(conf.getInt("spark.executor.shutdownHookDelay", 300)) | ||
} | ||
} | ||
|
||
/** Returns the total amount of time this JVM process has spent in garbage collection. */ | ||
private def computeTotalGcTime(): Long = { | ||
ManagementFactory.getGarbageCollectorMXBeans.asScala.map(_.getCollectionTime).sum | ||
|
@@ -291,6 +314,12 @@ private[spark] class Executor( | |
val reason = cDE.toTaskEndReason | ||
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) | ||
|
||
case oom: OutOfMemoryError => | ||
shutdownHookLock.synchronized { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we know for sure that this thread will always win the race. If it loses, you dont' want this thread to wait. I think you can get around this, though, by using |
||
logError("Out of memory exception in " + Thread.currentThread(), oom) | ||
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskOutOfMemory)) | ||
} | ||
|
||
case t: Throwable => | ||
// Attempt to exit cleanly by informing the driver of our failure. | ||
// If anything goes wrong (or this was a fatal exception), we will delegate to | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't seem like you really need this? can't you just use
ExceptionFailure
?