-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #21221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…xecutors REST API Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction. Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics for the driver. Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the TaskMetrics, to minimize additional logging. Modify the AppStatusListener to record the peak values for each memory metric. Add the new memory metrics to the executors REST API.
Jenkins, ok to test |
Test build #90087 has finished for PR 21221 at commit
|
Jenkins, retest this please |
Test build #90422 has finished for PR 21221 at commit
|
} | ||
|
||
/** Reports heartbeat metrics for the driver. */ | ||
private def reportHeartBeat(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need this for driver
? If spark run in local mode, there's a local executor
, which will report heartbeat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With cluster mode, including YARN, there isn't a local executor, so the metrics for the driver would not be collected. Perhaps this could be modified to skip this step for local mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With cluster mode, including YARN, there isn't a local executor, so the metrics for the driver would not be collected.
Yes. But the problem is can we use executor
's getCurrentExecutorMetrics()
method for collecting memory metrics for driver
? IIRC, driver
do not acqurie memory from execution memory pool at least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit redundant for fields that aren't used by the driver -- for the driver, execution memory gets set to 0.
*/ | ||
private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: Long) { | ||
// Executor for the heartbeat task | ||
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering should the prefix name of heartbeater thread be "executor-heartbeater"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about "heartbeater", since it could be for the driver as well? Alternatively, we can also pass in the name to the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"pass in the name to the constructor" is better(if we do need to do this for the driver)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
(event.stageInfo.stageId, event.stageInfo.attemptNumber())) | ||
executorMap.foreach { | ||
executorEntry => { | ||
for ((executorId, peakExecutorMetrics) <- executorEntry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about case (executorId, peakExecutorMetrics) =>
? It would be more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The for loop (line 187) is going through the hashmap entries of executorId to peakExecutorMetrics, so there are multiple values. Could you please provide more detail for how "case (executorId, peakExecutorMetrics) =>" would work? If the for loop is OK, then I can add some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I revisited the code, I think you're right. My mistake, sorry.
@@ -169,6 +179,27 @@ private[spark] class EventLoggingListener( | |||
|
|||
// Events that trigger a flush | |||
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { | |||
// log the peak executor metrics for the stage, for each executor | |||
val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() | |||
val executorMap = liveStageExecutorMetrics.remove( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we always post a SparkListenerStageCompleted
event for failed satges (I can't rememer clearly)? If not, I think we should clean up other attempts of the same stage here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's safer to clean up earlier attempts -- I can add some code to iterate through earlier attemptIDs.
* Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no | ||
* values have been recorded yet. | ||
*/ | ||
private[spark] class PeakExecutorMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this class? It seems ExecutorMetrics
can already do the same work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got some errors when trying to add methods to ExecutorMetrics. I don't remember the details, but can try this again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you revisit this given the other refactoring that has taken place?
and if you do need this extra class, please include a comment here explaining the metrics
array and referencing MetricGetter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With ExecutorMetrics removed, it seems useful to have a class for tracking and setting peak metric values, that can be used by both EventLoggingListener and AppStatusListener.
@@ -93,6 +94,10 @@ private[spark] class EventLoggingListener( | |||
// Visible for tests only. | |||
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) | |||
|
|||
// map of live stages, to peak executor metrics for the stage | |||
private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we should track executor's memory metrics for each stage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is tracking peak metric values for executors for each stage, so that the peak values for the stage can be dumped at stage end. The purpose is to reduce the amount of logging, to only number of stages * number of executors ExecutorMetricsUpdate events.
I originally tried logging for new peak values, resetting when a new stage begins -- this is simpler, but can lead to more events being logged.
Having stage level information is useful for users trying to identify which stages are more memory intensive. This information could be useful they are trying to reduce the amount of memory used, since they would know which stages (and the relevant code) to focus on.
…enabled to enable/disable executor metrics update logging. Code review comments.
Test build #90613 has finished for PR 21221 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some minor things as I do another round and page things back in
def getCurrentExecutorMetrics( | ||
memoryManager: MemoryManager, | ||
direct: BufferPoolMXBean, | ||
mapped: BufferPoolMXBean) : ExecutorMetrics = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make more sense to move this inside Heartbeater
? Then you don't need to pass in any BufferPoolMXBeans. also rename to "getCurrentMemoryMetrics"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and easier to share the code between driver and executor.
@@ -81,7 +84,7 @@ private[spark] class EventLoggingListener( | |||
private val compressionCodecName = compressionCodec.map { c => | |||
CompressionCodec.getShortName(c.getClass.getName) | |||
} | |||
|
|||
logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecutorMetricsUpdates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't really seem necessary at all, definitely not at INFO level (and indentation is wrong).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Thanks, I hadn't meant to push that.
@@ -93,6 +96,10 @@ private[spark] class EventLoggingListener( | |||
// Visible for tests only. | |||
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) | |||
|
|||
// map of live stages, to peak executor metrics for the stage | |||
private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int), | |||
mutable.HashMap[String, PeakExecutorMetrics]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could just import mutable.HashMap (added bonus -- fits on one line)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) | ||
} | ||
|
||
// log the peak executor metrics for the stage, for each executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add a comment here that this will log metrics for all executors that were alive while the stage was running, whether or not they ran any tasks for that stage (I think that's what it will do here, right?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's all running executors, and does not filter based on if they have tasks for the stage. I've updated the comment.
@@ -209,6 +210,16 @@ class DAGScheduler( | |||
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) | |||
taskScheduler.setDAGScheduler(this) | |||
|
|||
/** driver heartbeat for collecting metrics */ | |||
private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, "driver-heartbeater", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets not put this in the DAGScheduler please -- this class is fragile enough as it is :)
I think this should just go in SparkContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved.
SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty)) | ||
} | ||
|
||
/** Create an executor added event for the specified executor Id. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added -> removed
though for that matter -- I'd just remove the doc comments on all these teeny helper methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove -- they are pretty self-explanatory.
i += 1 | ||
} | ||
checkEvent(lines(i), event) | ||
i += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this pretty confusing at first. I suggest renaming i
to logIdx
and including a comment about the j
loop. Also we tend to use (1 to 2).foreach
. eg.
// just before the SparkListenerStageCompleted gets logged, we expect to get a
// SparkListenerExecutorMetricsUpdate for each executor
(1 to 2).foreach { _ =>
checkExecutorMetricsUpdate(lines(logIdx), stageCompleted.stageInfo.stageId,
expectedMetricsEvents)
logIdx += 1
}
// also check that we get the expected SparkListenerStageCompleted
checkEvent(lines(logIdx), event)
logIdx += 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed for both.
assert(line.contains(event.getClass.toString.split("\\.").last)) | ||
event match { | ||
case executorMetrics: SparkListenerExecutorMetricsUpdate => | ||
JsonProtocol.sparkEventFromJson(parse(line)) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can pull JsonProtocol.sparkEventFromJson(parse(line))
out to avoid repeating, along with the type comparison.
val parsed = JsonProtocol.sparkEventFromJson(parse(line))
assert(parsed.getClass === event.getClass)
event match {
...
(also assertTypeError
does something else entirely: http://doc.scalatest.org/2.2.6/#org.scalatest.Assertions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, modified.
private def checkEvent(line: String, event: SparkListenerEvent): Unit = { | ||
assert(line.contains(event.getClass.toString.split("\\.").last)) | ||
event match { | ||
case executorMetrics: SparkListenerExecutorMetricsUpdate => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're never using this w/ SparkListenerExecutorMetricsUpdate
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, with the change in design to logging the executor metrics updates at stage end, this part is skipped -- I'll remove this.
updated = true | ||
} | ||
if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) { | ||
_offHeapStorageMemory = executorMetrics.offHeapStorageMemory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know spark has this kind of code all over the place already, but I really hate how error prone it is -- way too easy for a copy paste error to result in comparing the wrong two metrics, or updating the wrong value, or forgetting to update this when another metric is added, etc.
I just opened this edwinalu#1 as another way to do this that would eliminate a ton of boilerplate IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This is cleaner, and will make it easier to add new metrics. It is very easy to have a copy/paste error. I can merge and make the test changes -- let me know if that sounds good, or if you'd like to make some more changes first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more you can take it over from here, the better :) But let me know if there is anything which is confusing, or if the TODOs that I've left actually don't seem possible etc. and I can take a closer look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do. Thanks!
Metric enums
Test build #91424 has finished for PR 21221 at commit
|
ok to test |
probably need to be rebased |
Test build #91642 has finished for PR 21221 at commit
|
@squito , I'm modifying ExecutorMetrics to take in the metrics array -- this will be easier for tests where we pass in set values, and seems fine for the actual code. It will check that the length of the passed in array is the same as MetricGetter.values.length. Let me know if you have any concerns. @felixcheung , I'll finish the current changes, then rebase. |
@squito For PeakMemoryMetrics in api.scala, changing to the array gives REST API output of: "peakMemoryMetrics" : { instead of: "peakMemoryMetrics" : { Would it be OK to revert back to the original version of PeakMemoryMetrics, where each field is listed as a separate element? |
well, I think you should change the way PeakExecutorMetrics gets converted to json, so that it uses a name from the relevant |
… move logic for getting metrics to Heartbeater), and modifiy tests for the new ExecutorMetrics format.
…xecutors REST API Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction. Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics for the driver. Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the TaskMetrics, to minimize additional logging. Modify the AppStatusListener to record the peak values for each memory metric. Add the new memory metrics to the executors REST API.
…enabled to enable/disable executor metrics update logging. Code review comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, minor comments
@@ -216,8 +217,7 @@ private[spark] class Executor( | |||
|
|||
def stop(): Unit = { | |||
env.metricsSystem.report() | |||
heartbeater.shutdown() | |||
heartbeater.awaitTermination(10, TimeUnit.SECONDS) | |||
heartbeater.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
future: try {} catch { case NonFatal(e)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS = | ||
ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled") | ||
.booleanConf | ||
.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be "false" for now until we could test this out more, just to be on the safe side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be safer. I'll change to false, and we can change change to true after people have had a chance to test it out.
Jenkins, retest this please |
Test build #94842 has finished for PR 21221 at commit
|
Test build #94865 has finished for PR 21221 at commit
|
We're going to delay on merging this until after the 2.4 branch is cut. We can include this in Spark 2.5. |
@edwinalu - this can merge now that Spark 2.4's release branch has been cut, but there's conflicting files now. Can we clear the conflicts and then we can merge this? |
Test build #95776 has finished for PR 21221 at commit
|
@@ -103,6 +103,12 @@ public final void onExecutorMetricsUpdate( | |||
onEvent(executorMetricsUpdate); | |||
} | |||
|
|||
@Override | |||
public final void onStageExecutorMetrics( | |||
SparkListenerStageExecutorMetrics executorMetrics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove extra spaces for better indent
Test build #95801 has finished for PR 21221 at commit
|
Thanks, I think this looks good. With a prior +1 from @felixcheung and @squito I'm going to merge this now. Let us know if there are any further concerns and we can follow up. |
Thanks! |
For the other reviewers, this was merged to master (not 2.4) |
@mccheah When you merged the code, could you also leave the comments about which branches you did the merge? |
### What changes were proposed in this pull request? At Apache Spark 3.0.0, [SPARK-23429](#21221) added the ability to collect executor metrics via heartbeats and to expose it as a REST API. This PR aims to extend it to support `Prometheus` format additionally. ### Why are the changes needed? Prometheus.io is a CNCF project used widely with K8s. - https://github.com/prometheus/prometheus ### Does this PR introduce any user-facing change? Yes. New web interfaces are added along with the existing JSON API. | | JSON End Point | Prometheus End Point | | ------- | ------------------------------------ | --------------------------------- | | Driver | /api/v1/applications/{id}/executors/ | /metrics/executors/prometheus/ | ### How was this patch tested? Manually connect to the new end-points with `curl` and compare with JSON. **SETUP** ``` $ sbin/start-master.sh $ sbin/start-slave.sh spark://`hostname`:7077 $ bin/spark-shell --master spark://`hostname`:7077 --conf spark.ui.prometheus.enabled=true ``` **JSON (existing after SPARK-23429)** ``` $ curl -s http://localhost:4040/api/v1/applications/app-20190911204823-0000/executors [ { "id" : "driver", "hostPort" : "localhost:52615", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "totalCores" : 0, "maxTasks" : 0, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalGCTime" : 0, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : false, "maxMemory" : 384093388, "addTime" : "2019-09-12T03:48:23.875GMT", "executorLogs" : { }, "memoryMetrics" : { "usedOnHeapStorageMemory" : 0, "usedOffHeapStorageMemory" : 0, "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 0 }, "blacklistedInStages" : [ ], "peakMemoryMetrics" : { "JVMHeapMemory" : 229995952, "JVMOffHeapMemory" : 145872280, "OnHeapExecutionMemory" : 0, "OffHeapExecutionMemory" : 0, "OnHeapStorageMemory" : 0, "OffHeapStorageMemory" : 0, "OnHeapUnifiedMemory" : 0, "OffHeapUnifiedMemory" : 0, "DirectPoolMemory" : 75891, "MappedPoolMemory" : 0, "ProcessTreeJVMVMemory" : 0, "ProcessTreeJVMRSSMemory" : 0, "ProcessTreePythonVMemory" : 0, "ProcessTreePythonRSSMemory" : 0, "ProcessTreeOtherVMemory" : 0, "ProcessTreeOtherRSSMemory" : 0, "MinorGCCount" : 8, "MinorGCTime" : 82, "MajorGCCount" : 3, "MajorGCTime" : 128 }, "attributes" : { }, "resources" : { } }, { "id" : "0", "hostPort" : "127.0.0.1:52619", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "totalCores" : 16, "maxTasks" : 16, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalGCTime" : 0, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : false, "maxMemory" : 384093388, "addTime" : "2019-09-12T03:48:25.907GMT", "executorLogs" : { "stdout" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stdout", "stderr" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stderr" }, "memoryMetrics" : { "usedOnHeapStorageMemory" : 0, "usedOffHeapStorageMemory" : 0, "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 0 }, "blacklistedInStages" : [ ], "attributes" : { }, "resources" : { } } ] ``` **Prometheus** ``` $ curl -s http://localhost:4040/metrics/executors/prometheus metrics_app_20190911204823_0000_driver_executor_rddBlocks_Count 0 metrics_app_20190911204823_0000_driver_executor_memoryUsed_Count 0 metrics_app_20190911204823_0000_driver_executor_diskUsed_Count 0 metrics_app_20190911204823_0000_driver_executor_totalCores_Count 0 metrics_app_20190911204823_0000_driver_executor_maxTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_activeTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_failedTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_completedTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_totalTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_totalDuration_Value 0 metrics_app_20190911204823_0000_driver_executor_totalGCTime_Value 0 metrics_app_20190911204823_0000_driver_executor_totalInputBytes_Count 0 metrics_app_20190911204823_0000_driver_executor_totalShuffleRead_Count 0 metrics_app_20190911204823_0000_driver_executor_totalShuffleWrite_Count 0 metrics_app_20190911204823_0000_driver_executor_maxMemory_Count 384093388 metrics_app_20190911204823_0000_driver_executor_usedOnHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_usedOffHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_totalOnHeapStorageMemory_Count 384093388 metrics_app_20190911204823_0000_driver_executor_totalOffHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_JVMHeapMemory_Count 230406336 metrics_app_20190911204823_0000_driver_executor_JVMOffHeapMemory_Count 146132592 metrics_app_20190911204823_0000_driver_executor_OnHeapExecutionMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OffHeapExecutionMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OnHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OffHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OnHeapUnifiedMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OffHeapUnifiedMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_DirectPoolMemory_Count 97049 metrics_app_20190911204823_0000_driver_executor_MappedPoolMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMVMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMRSSMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonVMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonRSSMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherVMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherRSSMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_MinorGCCount_Count 8 metrics_app_20190911204823_0000_driver_executor_MinorGCTime_Count 82 metrics_app_20190911204823_0000_driver_executor_MajorGCCount_Count 3 metrics_app_20190911204823_0000_driver_executor_MajorGCTime_Count 128 metrics_app_20190911204823_0000_0_executor_rddBlocks_Count 0 metrics_app_20190911204823_0000_0_executor_memoryUsed_Count 0 metrics_app_20190911204823_0000_0_executor_diskUsed_Count 0 metrics_app_20190911204823_0000_0_executor_totalCores_Count 16 metrics_app_20190911204823_0000_0_executor_maxTasks_Count 16 metrics_app_20190911204823_0000_0_executor_activeTasks_Count 0 metrics_app_20190911204823_0000_0_executor_failedTasks_Count 0 metrics_app_20190911204823_0000_0_executor_completedTasks_Count 0 metrics_app_20190911204823_0000_0_executor_totalTasks_Count 0 metrics_app_20190911204823_0000_0_executor_totalDuration_Value 0 metrics_app_20190911204823_0000_0_executor_totalGCTime_Value 0 metrics_app_20190911204823_0000_0_executor_totalInputBytes_Count 0 metrics_app_20190911204823_0000_0_executor_totalShuffleRead_Count 0 metrics_app_20190911204823_0000_0_executor_totalShuffleWrite_Count 0 metrics_app_20190911204823_0000_0_executor_maxMemory_Count 384093388 metrics_app_20190911204823_0000_0_executor_usedOnHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_0_executor_usedOffHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_0_executor_totalOnHeapStorageMemory_Count 384093388 metrics_app_20190911204823_0000_0_executor_totalOffHeapStorageMemory_Count 0 ``` Closes #25770 from dongjoon-hyun/SPARK-29064. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
### What changes were proposed in this pull request? At Apache Spark 3.0.0, [SPARK-23429](apache#21221) added the ability to collect executor metrics via heartbeats and to expose it as a REST API. This PR aims to extend it to support `Prometheus` format additionally. ### Why are the changes needed? Prometheus.io is a CNCF project used widely with K8s. - https://github.com/prometheus/prometheus ### Does this PR introduce any user-facing change? Yes. New web interfaces are added along with the existing JSON API. | | JSON End Point | Prometheus End Point | | ------- | ------------------------------------ | --------------------------------- | | Driver | /api/v1/applications/{id}/executors/ | /metrics/executors/prometheus/ | ### How was this patch tested? Manually connect to the new end-points with `curl` and compare with JSON. **SETUP** ``` $ sbin/start-master.sh $ sbin/start-slave.sh spark://`hostname`:7077 $ bin/spark-shell --master spark://`hostname`:7077 --conf spark.ui.prometheus.enabled=true ``` **JSON (existing after SPARK-23429)** ``` $ curl -s http://localhost:4040/api/v1/applications/app-20190911204823-0000/executors [ { "id" : "driver", "hostPort" : "localhost:52615", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "totalCores" : 0, "maxTasks" : 0, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalGCTime" : 0, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : false, "maxMemory" : 384093388, "addTime" : "2019-09-12T03:48:23.875GMT", "executorLogs" : { }, "memoryMetrics" : { "usedOnHeapStorageMemory" : 0, "usedOffHeapStorageMemory" : 0, "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 0 }, "blacklistedInStages" : [ ], "peakMemoryMetrics" : { "JVMHeapMemory" : 229995952, "JVMOffHeapMemory" : 145872280, "OnHeapExecutionMemory" : 0, "OffHeapExecutionMemory" : 0, "OnHeapStorageMemory" : 0, "OffHeapStorageMemory" : 0, "OnHeapUnifiedMemory" : 0, "OffHeapUnifiedMemory" : 0, "DirectPoolMemory" : 75891, "MappedPoolMemory" : 0, "ProcessTreeJVMVMemory" : 0, "ProcessTreeJVMRSSMemory" : 0, "ProcessTreePythonVMemory" : 0, "ProcessTreePythonRSSMemory" : 0, "ProcessTreeOtherVMemory" : 0, "ProcessTreeOtherRSSMemory" : 0, "MinorGCCount" : 8, "MinorGCTime" : 82, "MajorGCCount" : 3, "MajorGCTime" : 128 }, "attributes" : { }, "resources" : { } }, { "id" : "0", "hostPort" : "127.0.0.1:52619", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "totalCores" : 16, "maxTasks" : 16, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalGCTime" : 0, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "isBlacklisted" : false, "maxMemory" : 384093388, "addTime" : "2019-09-12T03:48:25.907GMT", "executorLogs" : { "stdout" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stdout", "stderr" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stderr" }, "memoryMetrics" : { "usedOnHeapStorageMemory" : 0, "usedOffHeapStorageMemory" : 0, "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 0 }, "blacklistedInStages" : [ ], "attributes" : { }, "resources" : { } } ] ``` **Prometheus** ``` $ curl -s http://localhost:4040/metrics/executors/prometheus metrics_app_20190911204823_0000_driver_executor_rddBlocks_Count 0 metrics_app_20190911204823_0000_driver_executor_memoryUsed_Count 0 metrics_app_20190911204823_0000_driver_executor_diskUsed_Count 0 metrics_app_20190911204823_0000_driver_executor_totalCores_Count 0 metrics_app_20190911204823_0000_driver_executor_maxTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_activeTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_failedTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_completedTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_totalTasks_Count 0 metrics_app_20190911204823_0000_driver_executor_totalDuration_Value 0 metrics_app_20190911204823_0000_driver_executor_totalGCTime_Value 0 metrics_app_20190911204823_0000_driver_executor_totalInputBytes_Count 0 metrics_app_20190911204823_0000_driver_executor_totalShuffleRead_Count 0 metrics_app_20190911204823_0000_driver_executor_totalShuffleWrite_Count 0 metrics_app_20190911204823_0000_driver_executor_maxMemory_Count 384093388 metrics_app_20190911204823_0000_driver_executor_usedOnHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_usedOffHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_totalOnHeapStorageMemory_Count 384093388 metrics_app_20190911204823_0000_driver_executor_totalOffHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_JVMHeapMemory_Count 230406336 metrics_app_20190911204823_0000_driver_executor_JVMOffHeapMemory_Count 146132592 metrics_app_20190911204823_0000_driver_executor_OnHeapExecutionMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OffHeapExecutionMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OnHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OffHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OnHeapUnifiedMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_OffHeapUnifiedMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_DirectPoolMemory_Count 97049 metrics_app_20190911204823_0000_driver_executor_MappedPoolMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMVMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMRSSMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonVMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonRSSMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherVMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherRSSMemory_Count 0 metrics_app_20190911204823_0000_driver_executor_MinorGCCount_Count 8 metrics_app_20190911204823_0000_driver_executor_MinorGCTime_Count 82 metrics_app_20190911204823_0000_driver_executor_MajorGCCount_Count 3 metrics_app_20190911204823_0000_driver_executor_MajorGCTime_Count 128 metrics_app_20190911204823_0000_0_executor_rddBlocks_Count 0 metrics_app_20190911204823_0000_0_executor_memoryUsed_Count 0 metrics_app_20190911204823_0000_0_executor_diskUsed_Count 0 metrics_app_20190911204823_0000_0_executor_totalCores_Count 16 metrics_app_20190911204823_0000_0_executor_maxTasks_Count 16 metrics_app_20190911204823_0000_0_executor_activeTasks_Count 0 metrics_app_20190911204823_0000_0_executor_failedTasks_Count 0 metrics_app_20190911204823_0000_0_executor_completedTasks_Count 0 metrics_app_20190911204823_0000_0_executor_totalTasks_Count 0 metrics_app_20190911204823_0000_0_executor_totalDuration_Value 0 metrics_app_20190911204823_0000_0_executor_totalGCTime_Value 0 metrics_app_20190911204823_0000_0_executor_totalInputBytes_Count 0 metrics_app_20190911204823_0000_0_executor_totalShuffleRead_Count 0 metrics_app_20190911204823_0000_0_executor_totalShuffleWrite_Count 0 metrics_app_20190911204823_0000_0_executor_maxMemory_Count 384093388 metrics_app_20190911204823_0000_0_executor_usedOnHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_0_executor_usedOffHeapStorageMemory_Count 0 metrics_app_20190911204823_0000_0_executor_totalOnHeapStorageMemory_Count 384093388 metrics_app_20190911204823_0000_0_executor_totalOffHeapStorageMemory_Count 0 ``` Closes apache#25770 from dongjoon-hyun/SPARK-29064. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory, on/off heap unified memory, direct memory, and mapped memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.
What changes were proposed in this pull request?
An ExecutorMetrics class is added, with jvmUsedHeapMemory, jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory, onHeapUnifiedMemory, offHeapUnifiedMemory, directMemory and mappedMemory. The new ExecutorMetrics is sent by executors to the driver as part of the Heartbeat. A heartbeat is added for the driver as well, to collect these metrics for the driver.
The EventLoggingListener store information about the peak values for each metric, per active stage and executor. When a StageCompleted event is seen, a StageExecutorsMetrics event will be logged for each executor, with peak values for the stage.
The AppStatusListener records the peak values for each memory metric.
The new memory metrics are added to the executors REST API.
How was this patch tested?
New unit tests have been added. This was also tested on our cluster.