Skip to content

[SPARK-26329][CORE] Faster polling of executor memory metrics. #23767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 22 commits into from

Conversation

wypoon
Copy link
Contributor

@wypoon wypoon commented Feb 12, 2019

What changes were proposed in this pull request?

Prior to this change, in an executor, on each heartbeat, memory metrics are polled and sent in the heartbeat. The heartbeat interval is 10s by default. With this change, in an executor, memory metrics can optionally be polled in a separate poller at a shorter interval.

For each executor, we use a map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) to track what stages are active as well as the per-stage memory metric peaks. When polling the executor memory metrics, we attribute the memory to the active stage(s), and update the peaks. In a heartbeat, we send the per-stage peaks (for stages active at that time), and then reset the peaks. The semantics would be that the per-stage peaks sent in each heartbeat are the peaks since the last heartbeat.

We also keep a map of taskId to memory metric peaks. This tracks the metric peaks during the lifetime of the task. The polling thread updates this as well. At end of a task, we send the peak metric values in the task result. In case of task failure, we send the peak metric values in the TaskFailedReason.

We continue to do the stage-level aggregation in the EventLoggingListener.

For the driver, we still only poll on heartbeats. What the driver sends will be the current values of the metrics in the driver at the time of the heartbeat. This is semantically the same as before.

How was this patch tested?

Unit tests. Manually tested applications on an actual system and checked the event logs; the metrics appear in the SparkListenerTaskEnd and SparkListenerStageExecutorMetrics events.

@SparkQA
Copy link

SparkQA commented Feb 12, 2019

Test build #102260 has finished for PR 23767 at commit ad8b5e5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 13, 2019

Test build #102266 has finished for PR 23767 at commit faa3b09.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon wypoon changed the title [SPARK-26329][CORE] Faster polling of executor memory metrics. [SPARK-26329][CORE][WIP] Faster polling of executor memory metrics. Feb 13, 2019
@SparkQA
Copy link

SparkQA commented Feb 13, 2019

Test build #102271 has finished for PR 23767 at commit 0dd8600.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 13, 2019

Test build #102280 has finished for PR 23767 at commit 3db5924.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon
Copy link
Contributor Author

wypoon commented Feb 13, 2019

Retest this please.

@SparkQA
Copy link

SparkQA commented Feb 13, 2019

Test build #102301 has finished for PR 23767 at commit 3db5924.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon
Copy link
Contributor Author

wypoon commented Feb 13, 2019

Retest this please.

@SparkQA
Copy link

SparkQA commented Feb 14, 2019

Test build #102311 has finished for PR 23767 at commit 3db5924.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@HyukjinKwon
Copy link
Member

PySpark test failures look irrelevant.

@SparkQA
Copy link

SparkQA commented Feb 14, 2019

Test build #102329 has finished for PR 23767 at commit 3db5924.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon
Copy link
Contributor Author

wypoon commented Feb 14, 2019

The PySpark tests and PySpark packaging tests were successful this time, but the build errored in running SparkR tests.
In any case, Java and Scala unit tests are green.
@edwinalu can you please take a look at my change?

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Feb 14, 2019

Test build #102358 has finished for PR 23767 at commit 3db5924.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Feb 15, 2019

@mccheah you might be interested in this too

@wypoon
Copy link
Contributor Author

wypoon commented Feb 17, 2019

@edwinalu thank you for your feedback. I'll be on vacation next week. When I come back, I'll look into your comments further.

@squito
Copy link
Contributor

squito commented Feb 20, 2019

btw we should probably also update the docs on ExecutorMetricType to explain threading concerns, as its less clear now. IIUC, implementations do not need to be thread-safe currently -- they will only be called by one thread at a time.

@wypoon wypoon force-pushed the wypoon_SPARK-26329 branch from 3db5924 to 4a947e9 Compare March 6, 2019 05:01
@SparkQA
Copy link

SparkQA commented Mar 6, 2019

Test build #103079 has finished for PR 23767 at commit 4a947e9.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon
Copy link
Contributor Author

wypoon commented Mar 6, 2019

Retest this please.

@SparkQA
Copy link

SparkQA commented Mar 6, 2019

Test build #103104 has finished for PR 23767 at commit 4a947e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon wypoon force-pushed the wypoon_SPARK-26329 branch from 4a947e9 to e13edaa Compare March 9, 2019 00:05
@wypoon
Copy link
Contributor Author

wypoon commented Mar 9, 2019

btw we should probably also update the docs on ExecutorMetricType to explain threading concerns, as its less clear now. IIUC, implementations do not need to be thread-safe currently -- they will only be called by one thread at a time.

For each executor (or the driver), the executor metrics are polled by a single thread, so I don't think there are any concerns there. It's only in the tracking of per-stage and per-task metric peaks that there are concurrency concerns around the data structures used to track them.

@SparkQA
Copy link

SparkQA commented Mar 9, 2019

Test build #103242 has finished for PR 23767 at commit e13edaa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon
Copy link
Contributor Author

wypoon commented Mar 10, 2019

org.apache.spark.streaming.kafka010.DirectKafkaStreamSuite."offset recovery from kafka" failed.
Appears to be flakiness. I ran the suite locally and it passes.

wypoon added 14 commits July 29, 2019 13:53
…ggregation.

Add an internal check within the test showing how the expected aggregated metrics are calculated. This documents how the expected values are arrived at.
In ExecutorMetricsPoller, combine onTaskLaunch and onTaskStart, and combine
onTaskCompletion and onTaskCleanup. Simplify getExecutorUpdates, as there
should never be a stage in stageTCMP with task count == 0.

Some code format cleanup and minor improvements.
Add some explanatory comments around existing helper methods and classes
and how they are currently invoked.
…utable.Map.

In Heartbeat, since the Map we actually construct and send as executorUpdates
is a mutable one, make the signature use a scala.collection,mutable.Map
instead of scala.collection.Map.
There are three existing cases in HistoryServerSuite related to executor
metrics. Replace them with a single case that shows all the executor metrics
we currently collect; they include proc fs metrics and garbage collection
netrics. Uploaded new event logs. These show executor metrics at task end.
…gle pass.

Also, make driver stage key a constant; and use SAM syntax.
Fix some bugs in ExecutorSuite as well.
@wypoon wypoon force-pushed the wypoon_SPARK-26329 branch from a21ac84 to 7331b27 Compare July 29, 2019 22:29
@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108351 has finished for PR 23767 at commit 7331b27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some super teeny comments, otherwise lgtm

@SparkQA
Copy link

SparkQA commented Jul 31, 2019

Test build #108484 has finished for PR 23767 at commit 7556d6a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Aug 1, 2019

merged to master, thanks @wypoon !

@asfgit asfgit closed this in 80ab19b Aug 1, 2019
@@ -574,7 +600,8 @@ private[spark] class Executor(
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")

val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums))
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums, metricPeaks))
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)

case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wypoon Is there any special reason that metricPeaks is not send in this case and CommitDeniedException?

Copy link
Contributor Author

@wypoon wypoon Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no special reason. I saw that in some cases, accumulators are collected and sent in the TaskFailedReason, and in those cases I send the task metric peaks as well; in the other cases, accumulators are not collected so I don't send the task metric peaks either.

LucaCanali added a commit to LucaCanali/spark that referenced this pull request Nov 1, 2019
LucaCanali added a commit to LucaCanali/spark that referenced this pull request Nov 11, 2019
asfgit pushed a commit that referenced this pull request Dec 9, 2019
…tion to the metrics system

## What changes were proposed in this pull request?

This PR proposes to add instrumentation of memory usage via the Spark Dropwizard/Codahale metrics system. Memory usage metrics are available via the Executor metrics, recently implemented as detailed in https://issues.apache.org/jira/browse/SPARK-23206.
Additional notes: This takes advantage of the metrics poller introduced in #23767.

## Why are the changes needed?
Executor metrics bring have many useful insights on memory usage, in particular on the usage of storage memory and executor memory. This is useful for troubleshooting. Having the information in the metrics systems allows to add those metrics to Spark performance dashboards and study memory usage as a function of time, as in the example graph https://issues.apache.org/jira/secure/attachment/12962810/Example_dashboard_Spark_Memory_Metrics.PNG

## Does this PR introduce any user-facing change?
Adds `ExecutorMetrics` source to publish executor metrics via the Dropwizard metrics system. Details of the available metrics in docs/monitoring.md
Adds configuration parameter `spark.metrics.executormetrics.source.enabled`

## How was this patch tested?

Tested on YARN cluster and with an existing setup for a Spark dashboard based on InfluxDB and Grafana.

Closes #24132 from LucaCanali/memoryMetricsSource.

Authored-by: Luca Canali <luca.canali@cern.ch>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants