Skip to content

[SPARK-27189][CORE] Add Executor metrics and memory usage instrumentation to the metrics system #24132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

LucaCanali
Copy link
Contributor

@LucaCanali LucaCanali commented Mar 18, 2019

What changes were proposed in this pull request?

This PR proposes to add instrumentation of memory usage via the Spark Dropwizard/Codahale metrics system. Memory usage metrics are available via the Executor metrics, recently implemented as detailed in https://issues.apache.org/jira/browse/SPARK-23206.
Additional notes: This takes advantage of the metrics poller introduced in #23767.

Why are the changes needed?

Executor metrics bring have many useful insights on memory usage, in particular on the usage of storage memory and executor memory. This is useful for troubleshooting. Having the information in the metrics systems allows to add those metrics to Spark performance dashboards and study memory usage as a function of time, as in the example graph https://issues.apache.org/jira/secure/attachment/12962810/Example_dashboard_Spark_Memory_Metrics.PNG

Does this PR introduce any user-facing change?

Adds ExecutorMetrics source to publish executor metrics via the Dropwizard metrics system. Details of the available metrics in docs/monitoring.md
Adds configuration parameter spark.metrics.executormetrics.source.enabled

How was this patch tested?

Tested on YARN cluster and with an existing setup for a Spark dashboard based on InfluxDB and Grafana.


def updateCounters(memorySource: MemorySource,
metricsUpdates: ExecutorMetrics): Unit = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: empty line not needed

val METRIC_PROCESS_TREE_OTHER_RSSMEMORY =
metricRegistry.counter(MetricRegistry.name("ProcessTreeOtherRSSMemory"))

def updateCounters(memorySource: MemorySource,
Copy link
Contributor

@attilapiros attilapiros Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updateCounters is a member function and it should use the counters of its own instance. So the memorySource argument is not needed.

This way memorySource.updateCounters(memorySource, executorUpdates) will be just memorySource.updateCounters(executorUpdates)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Thanks for the comment.

@@ -1006,6 +1009,37 @@ when running in local mode.

- namespace=JVMCPU
- jvmCpuTime

- namespace=MemoryMetrics
- **note:** MemoryMetrics counters are updated as part the executor metrics heart
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: In the executor metrics heart beat I do not think the metrics word is correct there it is executor's heartbeat (I have only seen heartbeat written as one word i.e. at the config description of spark.executor.heartbeatInterval).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points thanks.


memorySource.METRIC_JVM_HEAP_MEMORY.inc(
def updateCounters(metricsUpdates: ExecutorMetrics): Unit = {
this.METRIC_JVM_HEAP_MEMORY.inc(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The this. is not needed as implicitly there when you leave it out.

@attilapiros
Copy link
Contributor

cc @squito

@squito
Copy link
Contributor

squito commented Mar 18, 2019

OK to test

@LucaCanali
Copy link
Contributor Author

@LantaoJin, @edwinalu, @rezasafi, given your work on SPARK-23206, would you be interested to comment on this PR?

@rezasafi
Copy link
Contributor

@LucaCanali Thanks for the work here. I will look in more detail later this week. The pr for SPARK-26357 is also open. Do you need that to be merged for this?

@LucaCanali
Copy link
Contributor Author

Thanks @rezasafi for the comment. This PR is independent of SPARK-26357. I guess it would be beneficial to somehow merge the two ideas/proposals in one PR?

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay reviewing this.

I like this idea, but I feel like we need to think through the approach a bit more. In fact its making me realize that some things which were bugging me about #23306 and that have made me drag my feet on that one really need to be thought through wholistically before we do this.

Its worth first contrasting these two approaches -- reza's approach was complicated by a desire to avoid repeatedly polling procfs, since a whole batch of metrics can be combined in one go, and since it has a non-zero performance overhead. That's complicated by the Gauge interface from dropwizard, where you don't actively set the value, instead you get polled whenever it decides to.

This pr takes a different approach -- instead of exposing it as a Gauge, it exposes it as a Counter (where you choose when to update the value), and you do that as part of the heartbeat. That lets you control when you update the values of the metrics, which entirely sidesteps the problem above.

But that has its own problems. First, the Counter api doesn't let you set the value directly, it only lets you set an increment. You're getting around this by polling for the current value, but this won't be correct if we ever update from multiple threads. We're not doing that now, but its just opening this up to subtle problems down the road.

But even more importantly, this doesn't seem to leverage the advantages of dropwizard metrics. I don't use it extensively myself, so I may be wrong here -- but from what I gather one major advantage of that system is that it can capture a much higher volume of metrics from each executor. We're taking great pains to try to capture as much info as we can in these metrics, but without putting too much stress on the driver (see eg. #23767).

I think the right thing to do here, to cover both this and reza's approach, is actually to have something like CachedGauge, but our own implementation of it, which would allow the metric system to poll the metrics once, but get the values for all of them. And combined with #23767, it might actually not ever do the polling itself, it could just get updated from the ExecutorMetricPoller (need to think about this part a bit more).

There are a few other minor things to think about -- some of these metrics might already be exposed another way, so it might be weird to expose them again (eg. all jvm metrics via JMXSink), but I dunno the right way to unify them; also I am wondering if we should have an idea of a "type" of metric, sort of like the dropwizard metrics, to distinguish metrics which capture an instantaneous value (eg. current JVM heap size) vs. metrics where a rolling delta might make more sense (eg. compute time or gc time). I had been deferring thinking about how that is exposed in spark's own metric reporting, but it might need to be addressed before we add these here, so they're added to the metricssystem the correct way.

Also cc @LantaoJin @edwinalu @wypoon who have been looking at related issues.

override val sourceName = "MemoryMetrics"

val METRIC_JVM_HEAP_MEMORY =
metricRegistry.counter(MetricRegistry.name("JVMHeapMemory"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutorMetricType.metricToOffset lets you avoid all of this boilerplate -- I think you could replace all of this with

  val nameToCodahaleMetric = ExecutorMetricType.metricToOffset.map { case (name, _) =>
      name -> metricRegistry.counter(MetricRegistry.name(name))
  }

  def updateCounters(metricsUpdates: ExecutorMetrics): Unit = {
    nameToCodahaleMetric.foreach { case (name, counter) =>
      val prevValue = counter.getCount()
      counter.inc(metricsUpdates.getMetricValue(name) - prevValue)
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent suggestion, thanks!

@LucaCanali
Copy link
Contributor Author

Thank you @squito for taking time to review this. I agree with your questions and proposals for an improved implemetation.

I'd just like to add to the thread the general idea/motivation of why I believe adding this type of metrics is useful. We use Spark metrics to feed a dashboard for performance monitoring of Spark applications. I have an example of how this works for us in https://db-blog.web.cern.ch/blog/luca-canali/2019-02-performance-dashboard-apache-spark
Memory/executor metrics as I added them while testing this, appear to be a useful addition to the performance dashbord. In particular, I like very much the details into Spark memory usage exposed by the storage memory and execution memory metrics.

As you mention, some memory-related metrics are available also from other sources, notably metrics defined in org.apache.spark.metrics.source.JvmSource. However, JvmSource is optional, so we could think that there is not really duplication of logged info: one could use the metrics introduced here instead of activating JvmSource, for example.

@rezasafi
Copy link
Contributor

rezasafi commented Mar 27, 2019

The only reasons that in #23306, I went for the complex approach were:

  • We didn't want to reintroduce again existing metrics. So the only focus was on Procfs metrics

  • We didn't want to rely on heartbeat while Metrics system has its own puling period setting etc.

I think #23306 is better, because of the caching. It may alleviate the problem of unified sampling, because we won't compute the metrics at defined intervals. It won't totally resolve the issue though. I think to actually resolve that some more sophisticated solutions is needed.

@squito
Copy link
Contributor

squito commented Mar 28, 2019

Jenkins, ok to test

@squito
Copy link
Contributor

squito commented Mar 28, 2019

@LucaCanali thanks for sharing that info. this is a bit of a sidetrack here -- but can you explain why you want use the metrics system instead of just using spark's own tracking in event logs? I always thought the major advantage was that if you had a distributed metrics collection mechanism, you could collect a much higher rate of metrics. But from the blog post you linked to, it looks like you're only pulling data in every 10 seconds -- the same rate as spark's heartbeat.

I suppose there are other advantages -- (1) there is a bigger ecosystem around graphite for plotting etc. and (2) even though the driver gets updates every 10 seconds, we don't put them all into the event log to avoid huge event logs, we only record per-stage peaks.

The major problem I have had in the past when I tried to use the metric system (which was a long time ago) is that it didn't know anything about tasks & scheduling. so I'd wonder why one executor had a peak value much higher than others at time X, and only after a lot more digging I'd discover it was because it was the only one actually running a task at that time. The spark-specific reporting, in the UI and what is in the event logs, handles that complication.

I'm just trying to make sure we have a good explanation for why there are these different systems, the pros and cons of each, and what the end goal here should be.

I do like that this approach here lets us easily take all the work for ExecutorMetrics and easily put them in the other system too, with a minimal amount of work. I actually wonder if we should go the other way -- should there be a way to take everything from the MetricsSystem and put it into ExecutorMetrics as well? Maybe not, just something to consider.

Anyway, for this specific change -- I'm currently thinking the right thing to do is to wait for #23767, and then combine this and #23306 into one change. We should use Gauges, we should be able to expose metrics at a higher frequency than heartbeats, and we should avoid excessive polling by using our own version of CachedGauge which just references a shared metric snapshot. Something like

class ExecutorMetricGauges {
  // or maybe this needs to be an AtomicLongArray or an AtomicReference[Array[Long]], 
  // need to think about this ...
  @volatile var snapshot: Array[Long] = _

  // called by ExecutorMetricsPoller
  def updateSnapshot(snapshot: Array[Long]) ...

  private class ExecutorMetricGauge(idx: Int) extends Gauge[Long] {
    def getValue: Long = snapshot(idx)
  }

  // This looks like a bunch
  // of independent gauges as far the metric system is concerned, but
  // actually they're all using one shared snapshot.
  val gauges = (0 until ExecutorMetricType.numMetrics).map { idx => new ExecutorMetricGauge(idx) }.toIndexedSeq

  def register(registry: MetricRegistry): Unit = {
    ExecutorMetricType.metricToOffset.foreach { case (name, idx) =>
      registry.register(MetricRegistry.name(name), gauges(idx))
    }
  }
}

@SparkQA
Copy link

SparkQA commented Mar 28, 2019

Test build #104053 has finished for PR 24132 at commit 05c3392.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LucaCanali
Copy link
Contributor Author

@squito I like your proposed solution for implementing the proposed change/improvement.

As for your question on why I find the metrics system useful, here are some comments:

I like both the fine grained drill-down capabilities of the event log with all details of task metrics and the Dropwizard metrics system, which I find easy to use for building performance dashboards.
SPARK-22190 has bridged part of the gap between the two, by exposing executor task metrics via the Dropwizard metrics system. I find SPARK-25228 (executor CPU time instrumentation) also useful.

One of the advantages of the performance dashboard is to provide a unified view of what the system is doing at a particular point in time in terms of system resource utilization and key workload metrics, for example this allows to answer key questions like: how many active tasks are there now, how does this compare with the number of available cores? What fraction of the available task time is spent on CPU, or garbage collection, or shuffle activities and other activities? How much data are we reading/writing? How much memory is being used? BTW, not all time-based metrics are instrumented yet for a full time-based performance analysis,but what is there is already reasonably useful.
Importantly the performance dashboard naturally displays data as graph over time, allowing studies performance and systems utilization over time.

Notably in Spark (and distributed systems in general) there can be significat periods of time spent on low system activities (such as number of active tasks dropping to very low values despite the number of available cores) due to stragglers or data skew or several other possible reasons, that the dashboard identifies naturally. You may need task-level metrics data, as the eventlog, though to further drill down on the root causes.

In terms of architecture I like that the dropwizard metrics system sends the metrics directly from the executors to the backend DB (graphite endpoint/influxDB in my case). Systems based on Spark listeners, as the eventlog, have to go via the driver and this can be a bit of a bottleneck in some cases (for example with many executors and many short tasks for example).

I have tried to summarize some of the main points I have come across on this topic so far. I guess there is more out there and there is room to write a longer comment/blog/presentation at one point, maybe also to see if more people have opinions on this topics.

@LucaCanali
Copy link
Contributor Author

Hi @squito, I see that #23767 is now merged, so I guess we can proceed with your suggestion to implement ExecutorMetricGauges?

@SparkQA
Copy link

SparkQA commented Nov 1, 2019

Test build #113111 has finished for PR 24132 at commit 756e849.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ExecutorMetricsSource extends Source
  • class ExecutorMetricGauge(idx: Int) extends Gauge[Long]

@SparkQA
Copy link

SparkQA commented Nov 1, 2019

Test build #113112 has finished for PR 24132 at commit 9cde62c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2019

Test build #113113 has finished for PR 24132 at commit 29f0efd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LucaCanali
Copy link
Contributor Author

Hi @squito, I have updated the PR following your ideas and example code. I'll be very interested to hear your comments.

@SparkQA
Copy link

SparkQA commented Nov 11, 2019

Test build #113595 has finished for PR 24132 at commit 3f2e8ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LucaCanali LucaCanali changed the title [SPARK-27189][CORE] Add executor-level memory usage metrics to the metrics system [SPARK-27189][CORE] Add Executor metrics and memory usage instrumentation to the metrics system Nov 11, 2019
Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the delays from me @LucaCanali . Overall I like this version, a few things that need to be fixed.

if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
new ExecutorMetricsSource
} else {
null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use an Option here, like we do for other optional members of SparkContext

if (conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
new ExecutorMetricsSource
} else {
null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here on using an Option

import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.metrics.source.Source

private[spark] class ExecutorMetricsSource extends Source {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd include some comments here explaining why this is setup this way (eg. we're exposing metrics that are a little expensive to check, so a couple of optimizations; (1) the procfs metrics are gathered all in one-go; (2) the values are not checked too often; (3) we re-use the cached values for spark internal stage-level metrics.


def register: Unit = {
// This looks like a bunch of independent gauges as far the metric system
// is concerned, but actually they're all using one shared snapshot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was my comment originally, but I meant it more to compare against other approaches, I think its confusing for someone new to this trying to understand what is going on here. I'd change to something like

This takes the values of the metrics we have already computed and stored in our snapshot, and exposes them as individual gauges for the metric system. This means the value never gets updated when polled from the metric system, only when we decide the ExecutorMetricPoller updates the snapshot.

@@ -622,6 +629,10 @@ class SparkContext(config: SparkConf) extends Logging {
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_env.metricsSystem.registerSource(new JVMCPUSource())
if (executorMetricsSource != null) {
executorMetricsSource.register
env.metricsSystem.registerSource(executorMetricsSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead how about changing these two lines to executorMEtricSource.register(env.metricSystem) and pushing that second call inside. Makes it a bit more clear for the user of a ExecutorMetricsSource

val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager)
executorMetricsSource.updateMetricsSnapshot(currentMetrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this be an NPE if its not enabled? another reason to use an Option here.

@LucaCanali
Copy link
Contributor Author

Thanks @squito for reviewing this. I'll take care of the changes. BTW, do you have any opinioins on the default value for METRICS_EXECUTORMETRICS_SOURCE_ENABLED ?

@SparkQA
Copy link

SparkQA commented Nov 25, 2019

Test build #114402 has finished for PR 24132 at commit 01c5925.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Nov 25, 2019

do you have any opinioins on the default value for METRICS_EXECUTORMETRICS_SOURCE_ENABLED ?

since I don't really use the metrics system much myself, I don't really have a strong opinion. How important is it to keep the set of exported metrics small (for load on the metrics collection agent / clutter on the display?) I think I'd probably put it on by default but want to hear your thoughts

@LucaCanali
Copy link
Contributor Author

I think it should be OK to have ExecutorMetrics on by default. It is the case for most metrics at present. BTW, the JVM-related memory metrics in ExecutorMetrics can also be used as a partial (but in most cases good enough) replacement for the optional org.apache.spark.metrics.source.JvmSource.

@SparkQA
Copy link

SparkQA commented Nov 29, 2019

Test build #114623 has finished for PR 24132 at commit 1a0a3e9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2019

Test build #114737 has finished for PR 24132 at commit 4ab39cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple more small things, but basically looks good

_executorMetricsSource match {
case Some(executorMetricsSource: ExecutorMetricsSource) =>
executorMetricsSource.register(_env.metricsSystem)
case None => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only called for the side effect, so you can use

_executorMetricsSource.foreach(_.register(_env.metricsSystem))

case Some(executorMetricsSource: ExecutorMetricsSource) =>
executorMetricsSource.updateMetricsSnapshot(currentMetrics)
case None => None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here on using option.foreach

executorMetricsSource match {
case Some(executorMetricsSource: ExecutorMetricsSource) =>
executorMetricsSource.register(env.metricsSystem)
case None => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

executorMetricsSource match {
case Some(executorMetricsSource: ExecutorMetricsSource) =>
executorMetricsSource.updateMetricsSnapshot(latestMetrics)
case None => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

// (2) procfs metrics are gathered all in one-go and only conditionally:
// if the /proc filesystem exists
// and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true
// and spark.eventLog.logStageExecutorMetrics.enabled=true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change to javadoc style? (/**) just helps in IDEs

also some nits:
blank line after the first summary line, end sentence with period.

missing "to" and "so" in "Metrics related to the memory system can be expensive to gather, so ..."

@SparkQA
Copy link

SparkQA commented Dec 5, 2019

Test build #114895 has finished for PR 24132 at commit bb5f2b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@LucaCanali
Copy link
Contributor Author

Thank you @squito for all the help and for taking time for this.

@asfgit asfgit closed this in 729f43f Dec 9, 2019
Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merged to master, thanks @LucaCanali

metricsSnapshot = metricsUpdates
}

class ExecutorMetricGauge(idx: Int) extends Gauge[Long] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this class private when I merged it

@dongjoon-hyun
Copy link
Member

Hi, All.
This seems to break lint-java.

[ERROR] src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java:[28,8] (imports) UnusedImports: Unused import - org.junit.Assert.
43
[ERROR] src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java:[25,8] (imports) UnusedImports: Unused import - org.junit.Assert.
44
[ERROR] src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java:[30,8] (imports) UnusedImports: Unused import - org.junit.Assert.
45
[ERROR] src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java:[21,8] (imports) UnusedImports: Unused import - org.junit.Assert.

@squito
Copy link
Contributor

squito commented Dec 11, 2019

hi @dongjoon-hyun , sorry for my slow response, but just wanted to close the loop -- I think its clear now those were from another change which has already been fixed.

@dongjoon-hyun
Copy link
Member

Yes. Sorry for pinging here at that time. It was #26788 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants