-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-27189][CORE] Add Executor metrics and memory usage instrumentation to the metrics system #24132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
||
def updateCounters(memorySource: MemorySource, | ||
metricsUpdates: ExecutorMetrics): Unit = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: empty line not needed
val METRIC_PROCESS_TREE_OTHER_RSSMEMORY = | ||
metricRegistry.counter(MetricRegistry.name("ProcessTreeOtherRSSMemory")) | ||
|
||
def updateCounters(memorySource: MemorySource, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The updateCounters is a member function and it should use the counters of its own instance. So the memorySource
argument is not needed.
This way memorySource.updateCounters(memorySource, executorUpdates)
will be just memorySource.updateCounters(executorUpdates)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. Thanks for the comment.
docs/monitoring.md
Outdated
@@ -1006,6 +1009,37 @@ when running in local mode. | |||
|
|||
- namespace=JVMCPU | |||
- jvmCpuTime | |||
|
|||
- namespace=MemoryMetrics | |||
- **note:** MemoryMetrics counters are updated as part the executor metrics heart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: In the executor metrics heart beat
I do not think the metrics
word is correct there it is executor's heartbeat
(I have only seen heartbeat written as one word i.e. at the config description of spark.executor.heartbeatInterval).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points thanks.
|
||
memorySource.METRIC_JVM_HEAP_MEMORY.inc( | ||
def updateCounters(metricsUpdates: ExecutorMetrics): Unit = { | ||
this.METRIC_JVM_HEAP_MEMORY.inc( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The this.
is not needed as implicitly there when you leave it out.
cc @squito |
OK to test |
@LantaoJin, @edwinalu, @rezasafi, given your work on SPARK-23206, would you be interested to comment on this PR? |
@LucaCanali Thanks for the work here. I will look in more detail later this week. The pr for SPARK-26357 is also open. Do you need that to be merged for this? |
Thanks @rezasafi for the comment. This PR is independent of SPARK-26357. I guess it would be beneficial to somehow merge the two ideas/proposals in one PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay reviewing this.
I like this idea, but I feel like we need to think through the approach a bit more. In fact its making me realize that some things which were bugging me about #23306 and that have made me drag my feet on that one really need to be thought through wholistically before we do this.
Its worth first contrasting these two approaches -- reza's approach was complicated by a desire to avoid repeatedly polling procfs, since a whole batch of metrics can be combined in one go, and since it has a non-zero performance overhead. That's complicated by the Gauge
interface from dropwizard, where you don't actively set the value, instead you get polled whenever it decides to.
This pr takes a different approach -- instead of exposing it as a Gauge
, it exposes it as a Counter
(where you choose when to update the value), and you do that as part of the heartbeat. That lets you control when you update the values of the metrics, which entirely sidesteps the problem above.
But that has its own problems. First, the Counter api doesn't let you set the value directly, it only lets you set an increment. You're getting around this by polling for the current value, but this won't be correct if we ever update from multiple threads. We're not doing that now, but its just opening this up to subtle problems down the road.
But even more importantly, this doesn't seem to leverage the advantages of dropwizard metrics. I don't use it extensively myself, so I may be wrong here -- but from what I gather one major advantage of that system is that it can capture a much higher volume of metrics from each executor. We're taking great pains to try to capture as much info as we can in these metrics, but without putting too much stress on the driver (see eg. #23767).
I think the right thing to do here, to cover both this and reza's approach, is actually to have something like CachedGauge
, but our own implementation of it, which would allow the metric system to poll the metrics once, but get the values for all of them. And combined with #23767, it might actually not ever do the polling itself, it could just get updated from the ExecutorMetricPoller (need to think about this part a bit more).
There are a few other minor things to think about -- some of these metrics might already be exposed another way, so it might be weird to expose them again (eg. all jvm metrics via JMXSink), but I dunno the right way to unify them; also I am wondering if we should have an idea of a "type" of metric, sort of like the dropwizard metrics, to distinguish metrics which capture an instantaneous value (eg. current JVM heap size) vs. metrics where a rolling delta might make more sense (eg. compute time or gc time). I had been deferring thinking about how that is exposed in spark's own metric reporting, but it might need to be addressed before we add these here, so they're added to the metricssystem the correct way.
Also cc @LantaoJin @edwinalu @wypoon who have been looking at related issues.
override val sourceName = "MemoryMetrics" | ||
|
||
val METRIC_JVM_HEAP_MEMORY = | ||
metricRegistry.counter(MetricRegistry.name("JVMHeapMemory")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutorMetricType.metricToOffset
lets you avoid all of this boilerplate -- I think you could replace all of this with
val nameToCodahaleMetric = ExecutorMetricType.metricToOffset.map { case (name, _) =>
name -> metricRegistry.counter(MetricRegistry.name(name))
}
def updateCounters(metricsUpdates: ExecutorMetrics): Unit = {
nameToCodahaleMetric.foreach { case (name, counter) =>
val prevValue = counter.getCount()
counter.inc(metricsUpdates.getMetricValue(name) - prevValue)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent suggestion, thanks!
Thank you @squito for taking time to review this. I agree with your questions and proposals for an improved implemetation. I'd just like to add to the thread the general idea/motivation of why I believe adding this type of metrics is useful. We use Spark metrics to feed a dashboard for performance monitoring of Spark applications. I have an example of how this works for us in https://db-blog.web.cern.ch/blog/luca-canali/2019-02-performance-dashboard-apache-spark As you mention, some memory-related metrics are available also from other sources, notably metrics defined in org.apache.spark.metrics.source.JvmSource. However, JvmSource is optional, so we could think that there is not really duplication of logged info: one could use the metrics introduced here instead of activating JvmSource, for example. |
The only reasons that in #23306, I went for the complex approach were:
I think #23306 is better, because of the caching. It may alleviate the problem of unified sampling, because we won't compute the metrics at defined intervals. It won't totally resolve the issue though. I think to actually resolve that some more sophisticated solutions is needed. |
Jenkins, ok to test |
@LucaCanali thanks for sharing that info. this is a bit of a sidetrack here -- but can you explain why you want use the metrics system instead of just using spark's own tracking in event logs? I always thought the major advantage was that if you had a distributed metrics collection mechanism, you could collect a much higher rate of metrics. But from the blog post you linked to, it looks like you're only pulling data in every 10 seconds -- the same rate as spark's heartbeat. I suppose there are other advantages -- (1) there is a bigger ecosystem around graphite for plotting etc. and (2) even though the driver gets updates every 10 seconds, we don't put them all into the event log to avoid huge event logs, we only record per-stage peaks. The major problem I have had in the past when I tried to use the metric system (which was a long time ago) is that it didn't know anything about tasks & scheduling. so I'd wonder why one executor had a peak value much higher than others at time X, and only after a lot more digging I'd discover it was because it was the only one actually running a task at that time. The spark-specific reporting, in the UI and what is in the event logs, handles that complication. I'm just trying to make sure we have a good explanation for why there are these different systems, the pros and cons of each, and what the end goal here should be. I do like that this approach here lets us easily take all the work for ExecutorMetrics and easily put them in the other system too, with a minimal amount of work. I actually wonder if we should go the other way -- should there be a way to take everything from the MetricsSystem and put it into ExecutorMetrics as well? Maybe not, just something to consider. Anyway, for this specific change -- I'm currently thinking the right thing to do is to wait for #23767, and then combine this and #23306 into one change. We should use Gauges, we should be able to expose metrics at a higher frequency than heartbeats, and we should avoid excessive polling by using our own version of class ExecutorMetricGauges {
// or maybe this needs to be an AtomicLongArray or an AtomicReference[Array[Long]],
// need to think about this ...
@volatile var snapshot: Array[Long] = _
// called by ExecutorMetricsPoller
def updateSnapshot(snapshot: Array[Long]) ...
private class ExecutorMetricGauge(idx: Int) extends Gauge[Long] {
def getValue: Long = snapshot(idx)
}
// This looks like a bunch
// of independent gauges as far the metric system is concerned, but
// actually they're all using one shared snapshot.
val gauges = (0 until ExecutorMetricType.numMetrics).map { idx => new ExecutorMetricGauge(idx) }.toIndexedSeq
def register(registry: MetricRegistry): Unit = {
ExecutorMetricType.metricToOffset.foreach { case (name, idx) =>
registry.register(MetricRegistry.name(name), gauges(idx))
}
}
} |
Test build #104053 has finished for PR 24132 at commit
|
@squito I like your proposed solution for implementing the proposed change/improvement. As for your question on why I find the metrics system useful, here are some comments: I like both the fine grained drill-down capabilities of the event log with all details of task metrics and the Dropwizard metrics system, which I find easy to use for building performance dashboards. One of the advantages of the performance dashboard is to provide a unified view of what the system is doing at a particular point in time in terms of system resource utilization and key workload metrics, for example this allows to answer key questions like: how many active tasks are there now, how does this compare with the number of available cores? What fraction of the available task time is spent on CPU, or garbage collection, or shuffle activities and other activities? How much data are we reading/writing? How much memory is being used? BTW, not all time-based metrics are instrumented yet for a full time-based performance analysis,but what is there is already reasonably useful. Notably in Spark (and distributed systems in general) there can be significat periods of time spent on low system activities (such as number of active tasks dropping to very low values despite the number of available cores) due to stragglers or data skew or several other possible reasons, that the dashboard identifies naturally. You may need task-level metrics data, as the eventlog, though to further drill down on the root causes. In terms of architecture I like that the dropwizard metrics system sends the metrics directly from the executors to the backend DB (graphite endpoint/influxDB in my case). Systems based on Spark listeners, as the eventlog, have to go via the driver and this can be a bit of a bottleneck in some cases (for example with many executors and many short tasks for example). I have tried to summarize some of the main points I have come across on this topic so far. I guess there is more out there and there is room to write a longer comment/blog/presentation at one point, maybe also to see if more people have opinions on this topics. |
05c3392
to
756e849
Compare
Test build #113111 has finished for PR 24132 at commit
|
Test build #113112 has finished for PR 24132 at commit
|
Test build #113113 has finished for PR 24132 at commit
|
Hi @squito, I have updated the PR following your ideas and example code. I'll be very interested to hear your comments. |
29f0efd
to
3f2e8ff
Compare
Test build #113595 has finished for PR 24132 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the delays from me @LucaCanali . Overall I like this version, a few things that need to be fixed.
if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { | ||
new ExecutorMetricsSource | ||
} else { | ||
null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use an Option
here, like we do for other optional members of SparkContext
if (conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { | ||
new ExecutorMetricsSource | ||
} else { | ||
null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here on using an Option
import org.apache.spark.metrics.ExecutorMetricType | ||
import org.apache.spark.metrics.source.Source | ||
|
||
private[spark] class ExecutorMetricsSource extends Source { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd include some comments here explaining why this is setup this way (eg. we're exposing metrics that are a little expensive to check, so a couple of optimizations; (1) the procfs metrics are gathered all in one-go; (2) the values are not checked too often; (3) we re-use the cached values for spark internal stage-level metrics.
|
||
def register: Unit = { | ||
// This looks like a bunch of independent gauges as far the metric system | ||
// is concerned, but actually they're all using one shared snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this was my comment originally, but I meant it more to compare against other approaches, I think its confusing for someone new to this trying to understand what is going on here. I'd change to something like
This takes the values of the metrics we have already computed and stored in our snapshot, and exposes them as individual gauges for the metric system. This means the value never gets updated when polled from the metric system, only when we decide the ExecutorMetricPoller updates the snapshot.
@@ -622,6 +629,10 @@ class SparkContext(config: SparkConf) extends Logging { | |||
_env.metricsSystem.registerSource(_dagScheduler.metricsSource) | |||
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) | |||
_env.metricsSystem.registerSource(new JVMCPUSource()) | |||
if (executorMetricsSource != null) { | |||
executorMetricsSource.register | |||
env.metricsSystem.registerSource(executorMetricsSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead how about changing these two lines to executorMEtricSource.register(env.metricSystem)
and pushing that second call inside. Makes it a bit more clear for the user of a ExecutorMetricsSource
val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager) | ||
executorMetricsSource.updateMetricsSnapshot(currentMetrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this be an NPE if its not enabled? another reason to use an Option here.
Thanks @squito for reviewing this. I'll take care of the changes. BTW, do you have any opinioins on the default value for METRICS_EXECUTORMETRICS_SOURCE_ENABLED ? |
Test build #114402 has finished for PR 24132 at commit
|
since I don't really use the metrics system much myself, I don't really have a strong opinion. How important is it to keep the set of exported metrics small (for load on the metrics collection agent / clutter on the display?) I think I'd probably put it on by default but want to hear your thoughts |
I think it should be OK to have ExecutorMetrics on by default. It is the case for most metrics at present. BTW, the JVM-related memory metrics in ExecutorMetrics can also be used as a partial (but in most cases good enough) replacement for the optional |
Test build #114623 has finished for PR 24132 at commit
|
Test build #114737 has finished for PR 24132 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple more small things, but basically looks good
_executorMetricsSource match { | ||
case Some(executorMetricsSource: ExecutorMetricsSource) => | ||
executorMetricsSource.register(_env.metricsSystem) | ||
case None => None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only called for the side effect, so you can use
_executorMetricsSource.foreach(_.register(_env.metricsSystem))
case Some(executorMetricsSource: ExecutorMetricsSource) => | ||
executorMetricsSource.updateMetricsSnapshot(currentMetrics) | ||
case None => None | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here on using option.foreach
executorMetricsSource match { | ||
case Some(executorMetricsSource: ExecutorMetricsSource) => | ||
executorMetricsSource.register(env.metricsSystem) | ||
case None => None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
executorMetricsSource match { | ||
case Some(executorMetricsSource: ExecutorMetricsSource) => | ||
executorMetricsSource.updateMetricsSnapshot(latestMetrics) | ||
case None => None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
// (2) procfs metrics are gathered all in one-go and only conditionally: | ||
// if the /proc filesystem exists | ||
// and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true | ||
// and spark.eventLog.logStageExecutorMetrics.enabled=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you change to javadoc style? (/**
) just helps in IDEs
also some nits:
blank line after the first summary line, end sentence with period.
missing "to" and "so" in "Metrics related to the memory system can be expensive to gather, so ..."
Test build #114895 has finished for PR 24132 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Thank you @squito for all the help and for taking time for this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merged to master, thanks @LucaCanali
metricsSnapshot = metricsUpdates | ||
} | ||
|
||
class ExecutorMetricGauge(idx: Int) extends Gauge[Long] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this class private when I merged it
Hi, All.
|
hi @dongjoon-hyun , sorry for my slow response, but just wanted to close the loop -- I think its clear now those were from another change which has already been fixed. |
Yes. Sorry for pinging here at that time. It was #26788 . |
What changes were proposed in this pull request?
This PR proposes to add instrumentation of memory usage via the Spark Dropwizard/Codahale metrics system. Memory usage metrics are available via the Executor metrics, recently implemented as detailed in https://issues.apache.org/jira/browse/SPARK-23206.
Additional notes: This takes advantage of the metrics poller introduced in #23767.
Why are the changes needed?
Executor metrics bring have many useful insights on memory usage, in particular on the usage of storage memory and executor memory. This is useful for troubleshooting. Having the information in the metrics systems allows to add those metrics to Spark performance dashboards and study memory usage as a function of time, as in the example graph https://issues.apache.org/jira/secure/attachment/12962810/Example_dashboard_Spark_Memory_Metrics.PNG
Does this PR introduce any user-facing change?
Adds
ExecutorMetrics
source to publish executor metrics via the Dropwizard metrics system. Details of the available metrics in docs/monitoring.mdAdds configuration parameter
spark.metrics.executormetrics.source.enabled
How was this patch tested?
Tested on YARN cluster and with an existing setup for a Spark dashboard based on InfluxDB and Grafana.