-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-26329][CORE] Faster polling of executor memory metrics. #23767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9f5b195
03e41a8
7f6bd74
7397897
e1aeafc
75ba39d
ea2ff0d
0cbfc04
8cb30a8
077abb0
7a3c90d
3ed583a
0a4828a
9530b75
38a397c
e062e60
20b4b7e
b898ad2
fbb55bf
99addf1
7331b27
7556d6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ import java.util.concurrent._ | |
import javax.annotation.concurrent.GuardedBy | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable.{ArrayBuffer, HashMap, Map} | ||
import scala.collection.mutable.{ArrayBuffer, HashMap, Map, WrappedArray} | ||
import scala.concurrent.duration._ | ||
import scala.util.control.NonFatal | ||
|
||
|
@@ -188,9 +188,20 @@ private[spark] class Executor( | |
*/ | ||
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL) | ||
|
||
/** | ||
* Interval to poll for executor metrics, in milliseconds | ||
*/ | ||
private val METRICS_POLLING_INTERVAL_MS = conf.get(EXECUTOR_METRICS_POLLING_INTERVAL) | ||
wypoon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private val pollOnHeartbeat = if (METRICS_POLLING_INTERVAL_MS > 0) false else true | ||
|
||
// Poller for the memory metrics. Visible for testing. | ||
private[executor] val metricsPoller = new ExecutorMetricsPoller( | ||
env.memoryManager, | ||
METRICS_POLLING_INTERVAL_MS) | ||
|
||
// Executor for the heartbeat task. | ||
private val heartbeater = new Heartbeater( | ||
env.memoryManager, | ||
() => Executor.this.reportHeartBeat(), | ||
"executor-heartbeater", | ||
HEARTBEAT_INTERVAL_MS) | ||
|
@@ -207,6 +218,8 @@ private[spark] class Executor( | |
|
||
heartbeater.start() | ||
|
||
metricsPoller.start() | ||
|
||
private[executor] def numRunningTasks: Int = runningTasks.size() | ||
|
||
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { | ||
|
@@ -254,12 +267,18 @@ private[spark] class Executor( | |
|
||
def stop(): Unit = { | ||
env.metricsSystem.report() | ||
try { | ||
wypoon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
metricsPoller.stop() | ||
} catch { | ||
case NonFatal(e) => | ||
logWarning("Unable to stop executor metrics poller", e) | ||
} | ||
try { | ||
heartbeater.stop() | ||
} catch { | ||
case NonFatal(e) => | ||
logWarning("Unable to stop heartbeater", e) | ||
} | ||
} | ||
threadPool.shutdown() | ||
|
||
// Notify plugins that executor is shutting down so they can terminate cleanly | ||
|
@@ -380,6 +399,7 @@ private[spark] class Executor( | |
var taskStartTimeNs: Long = 0 | ||
var taskStartCpu: Long = 0 | ||
startGCTime = computeTotalGcTime() | ||
var taskStarted: Boolean = false | ||
|
||
try { | ||
// Must be set before updateDependencies() is called, in case fetching dependencies | ||
|
@@ -412,6 +432,9 @@ private[spark] class Executor( | |
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch) | ||
} | ||
|
||
metricsPoller.onTaskStart(taskId, task.stageId, task.stageAttemptId) | ||
taskStarted = true | ||
wypoon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Run the actual task and measure its runtime. | ||
taskStartTimeNs = System.nanoTime() | ||
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { | ||
|
@@ -529,8 +552,9 @@ private[spark] class Executor( | |
|
||
// Note: accumulator updates must be collected after TaskMetrics is updated | ||
val accumUpdates = task.collectAccumulatorUpdates() | ||
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId) | ||
// TODO: do not serialize value twice | ||
val directResult = new DirectTaskResult(valueBytes, accumUpdates) | ||
val directResult = new DirectTaskResult(valueBytes, accumUpdates, metricPeaks) | ||
val serializedDirectResult = ser.serialize(directResult) | ||
val resultSize = serializedDirectResult.limit() | ||
|
||
|
@@ -559,13 +583,15 @@ private[spark] class Executor( | |
executorSource.SUCCEEDED_TASKS.inc(1L) | ||
setTaskFinishedAndClearInterruptStatus() | ||
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) | ||
|
||
} catch { | ||
case t: TaskKilledException => | ||
logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") | ||
|
||
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) | ||
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums)) | ||
// Here and below, put task metric peaks in a WrappedArray to expose them as a Seq | ||
// without requiring a copy. | ||
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId)) | ||
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums, metricPeaks)) | ||
wypoon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) | ||
|
||
case _: InterruptedException | NonFatal(_) if | ||
|
@@ -574,7 +600,8 @@ private[spark] class Executor( | |
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") | ||
|
||
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) | ||
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums)) | ||
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId)) | ||
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums, metricPeaks)) | ||
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) | ||
|
||
case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wypoon Is there any special reason that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no special reason. I saw that in some cases, accumulators are collected and sent in the |
||
|
@@ -609,14 +636,19 @@ private[spark] class Executor( | |
// instead of an app issue). | ||
if (!ShutdownHookManager.inShutdown()) { | ||
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) | ||
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId)) | ||
|
||
val serializedTaskEndReason = { | ||
try { | ||
ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)) | ||
val ef = new ExceptionFailure(t, accUpdates).withAccums(accums) | ||
.withMetricPeaks(metricPeaks) | ||
ser.serialize(ef) | ||
} catch { | ||
case _: NotSerializableException => | ||
// t is not serializable so just send the stacktrace | ||
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) | ||
val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums) | ||
.withMetricPeaks(metricPeaks) | ||
ser.serialize(ef) | ||
} | ||
} | ||
setTaskFinishedAndClearInterruptStatus() | ||
|
@@ -632,6 +664,11 @@ private[spark] class Executor( | |
} | ||
} finally { | ||
runningTasks.remove(taskId) | ||
if (taskStarted) { | ||
// This means the task was successfully deserialized, its stageId and stageAttemptId | ||
// are known, and metricsPoller.onTaskStart was called. | ||
metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -848,8 +885,11 @@ private[spark] class Executor( | |
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() | ||
val curGCTime = computeTotalGcTime() | ||
|
||
// get executor level memory metrics | ||
val executorUpdates = heartbeater.getCurrentMetrics() | ||
if (pollOnHeartbeat) { | ||
metricsPoller.poll() | ||
} | ||
|
||
val executorUpdates = metricsPoller.getExecutorUpdates() | ||
|
||
for (taskRunner <- runningTasks.values().asScala) { | ||
if (taskRunner.task != null) { | ||
|
Uh oh!
There was an error while loading. Please reload this page.