-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26329][CORE] Faster polling of executor memory metrics. #23767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #102260 has finished for PR 23767 at commit
|
Test build #102266 has finished for PR 23767 at commit
|
Test build #102271 has finished for PR 23767 at commit
|
Test build #102280 has finished for PR 23767 at commit
|
Retest this please. |
Test build #102301 has finished for PR 23767 at commit
|
Retest this please. |
Test build #102311 has finished for PR 23767 at commit
|
retest this please |
PySpark test failures look irrelevant. |
Test build #102329 has finished for PR 23767 at commit
|
The PySpark tests and PySpark packaging tests were successful this time, but the build errored in running SparkR tests. |
retest this please |
Test build #102358 has finished for PR 23767 at commit
|
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
Outdated
Show resolved
Hide resolved
@mccheah you might be interested in this too |
@edwinalu thank you for your feedback. I'll be on vacation next week. When I come back, I'll look into your comments further. |
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
Show resolved
Hide resolved
btw we should probably also update the docs on |
3db5924
to
4a947e9
Compare
Test build #103079 has finished for PR 23767 at commit
|
Retest this please. |
Test build #103104 has finished for PR 23767 at commit
|
4a947e9
to
e13edaa
Compare
For each executor (or the driver), the executor metrics are polled by a single thread, so I don't think there are any concerns there. It's only in the tracking of per-stage and per-task metric peaks that there are concurrency concerns around the data structures used to track them. |
Test build #103242 has finished for PR 23767 at commit
|
org.apache.spark.streaming.kafka010.DirectKafkaStreamSuite."offset recovery from kafka" failed. |
…sult and TaskFailedReason.
…ggregation. Add an internal check within the test showing how the expected aggregated metrics are calculated. This documents how the expected values are arrived at.
…tor metrics aggregation.
In ExecutorMetricsPoller, combine onTaskLaunch and onTaskStart, and combine onTaskCompletion and onTaskCleanup. Simplify getExecutorUpdates, as there should never be a stage in stageTCMP with task count == 0. Some code format cleanup and minor improvements.
Add some explanatory comments around existing helper methods and classes and how they are currently invoked.
…utable.Map. In Heartbeat, since the Map we actually construct and send as executorUpdates is a mutable one, make the signature use a scala.collection,mutable.Map instead of scala.collection.Map.
There are three existing cases in HistoryServerSuite related to executor metrics. Replace them with a single case that shows all the executor metrics we currently collect; they include proc fs metrics and garbage collection netrics. Uploaded new event logs. These show executor metrics at task end.
…gle pass. Also, make driver stage key a constant; and use SAM syntax.
Fix some bugs in ExecutorSuite as well.
a21ac84
to
7331b27
Compare
Test build #108351 has finished for PR 23767 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some super teeny comments, otherwise lgtm
core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Outdated
Show resolved
Hide resolved
Test build #108484 has finished for PR 23767 at commit
|
merged to master, thanks @wypoon ! |
@@ -574,7 +600,8 @@ private[spark] class Executor( | |||
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") | |||
|
|||
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) | |||
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums)) | |||
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId)) | |||
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums, metricPeaks)) | |||
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) | |||
|
|||
case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wypoon Is there any special reason that metricPeaks
is not send in this case and CommitDeniedException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no special reason. I saw that in some cases, accumulators are collected and sent in the TaskFailedReason
, and in those cases I send the task metric peaks as well; in the other cases, accumulators are not collected so I don't send the task metric peaks either.
…tion to the metrics system ## What changes were proposed in this pull request? This PR proposes to add instrumentation of memory usage via the Spark Dropwizard/Codahale metrics system. Memory usage metrics are available via the Executor metrics, recently implemented as detailed in https://issues.apache.org/jira/browse/SPARK-23206. Additional notes: This takes advantage of the metrics poller introduced in #23767. ## Why are the changes needed? Executor metrics bring have many useful insights on memory usage, in particular on the usage of storage memory and executor memory. This is useful for troubleshooting. Having the information in the metrics systems allows to add those metrics to Spark performance dashboards and study memory usage as a function of time, as in the example graph https://issues.apache.org/jira/secure/attachment/12962810/Example_dashboard_Spark_Memory_Metrics.PNG ## Does this PR introduce any user-facing change? Adds `ExecutorMetrics` source to publish executor metrics via the Dropwizard metrics system. Details of the available metrics in docs/monitoring.md Adds configuration parameter `spark.metrics.executormetrics.source.enabled` ## How was this patch tested? Tested on YARN cluster and with an existing setup for a Spark dashboard based on InfluxDB and Grafana. Closes #24132 from LucaCanali/memoryMetricsSource. Authored-by: Luca Canali <luca.canali@cern.ch> Signed-off-by: Imran Rashid <irashid@cloudera.com>
What changes were proposed in this pull request?
Prior to this change, in an executor, on each heartbeat, memory metrics are polled and sent in the heartbeat. The heartbeat interval is 10s by default. With this change, in an executor, memory metrics can optionally be polled in a separate poller at a shorter interval.
For each executor, we use a map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) to track what stages are active as well as the per-stage memory metric peaks. When polling the executor memory metrics, we attribute the memory to the active stage(s), and update the peaks. In a heartbeat, we send the per-stage peaks (for stages active at that time), and then reset the peaks. The semantics would be that the per-stage peaks sent in each heartbeat are the peaks since the last heartbeat.
We also keep a map of taskId to memory metric peaks. This tracks the metric peaks during the lifetime of the task. The polling thread updates this as well. At end of a task, we send the peak metric values in the task result. In case of task failure, we send the peak metric values in the
TaskFailedReason
.We continue to do the stage-level aggregation in the EventLoggingListener.
For the driver, we still only poll on heartbeats. What the driver sends will be the current values of the metrics in the driver at the time of the heartbeat. This is semantically the same as before.
How was this patch tested?
Unit tests. Manually tested applications on an actual system and checked the event logs; the metrics appear in the SparkListenerTaskEnd and SparkListenerStageExecutorMetrics events.