Skip to content

[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #21221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c8e8abe
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
5d6ae1c
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
ad10d28
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
10ed328
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
2d20367
wip on enum based metrics
squito May 23, 2018
f904f1e
wip ... has both enum and non-enum version
squito May 23, 2018
c502ec4
case objects, mostly complete
squito May 23, 2018
7879e66
Merge pull request #1 from squito/metric_enums
edwinalu Jun 3, 2018
2662f6f
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
2871335
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
da83f2e
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
f25a44b
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
ca85c82
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
8b74ba8
wip on enum based metrics
squito May 23, 2018
036148c
wip ... has both enum and non-enum version
squito May 23, 2018
91fb1db
case objects, mostly complete
squito May 23, 2018
2d8894a
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
99044e6
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
edwinalu Jun 14, 2018
263c8c8
code review comments
edwinalu Jun 14, 2018
812fdcf
code review comments:
edwinalu Jun 22, 2018
7ed42a5
Address code review comments. Also make executorUpdates in SparkListe…
edwinalu Jun 28, 2018
8d9acdf
Revert and make executorUpdates in SparkListenerExecutorMetricsUpdate…
edwinalu Jun 29, 2018
20799d2
code review comments: hid array implementation of executor metrics, a…
edwinalu Jul 25, 2018
8905d23
merge with master
edwinalu Jul 25, 2018
a0eed11
address code review comments
edwinalu Aug 5, 2018
03cd5bc
code review comments
edwinalu Aug 13, 2018
10e7f15
Merge branch 'master' into SPARK-23429.2
edwinalu Aug 14, 2018
a14b82a
merge conflicts
edwinalu Aug 14, 2018
2897281
disable stage executor metrics logging by default
edwinalu Aug 16, 2018
ee4aa1d
Merge branch 'master' into SPARK-23429.2
edwinalu Sep 6, 2018
571285b
fix indentation
edwinalu Sep 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public final void onExecutorMetricsUpdate(
onEvent(executorMetricsUpdate);
}

@Override
public final void onStageExecutorMetrics(
SparkListenerStageExecutorMetrics executorMetrics) {
onEvent(executorMetrics);
}

@Override
public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
onEvent(executorAdded);
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
Expand All @@ -37,7 +38,8 @@ import org.apache.spark.util._
private[spark] case class Heartbeat(
executorId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
blockManagerId: BlockManagerId)
blockManagerId: BlockManagerId,
executorUpdates: ExecutorMetrics) // executor level updates

/**
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
Expand Down Expand Up @@ -119,14 +121,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorMetrics) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, accumUpdates, blockManagerId)
executorId, accumUpdates, blockManagerId, executorMetrics)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
Expand Down
71 changes: 71 additions & 0 deletions core/src/main/scala/org/apache/spark/Heartbeater.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.util.concurrent.TimeUnit

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Creates a heartbeat thread which will call the specified reportHeartbeat function at
* intervals of intervalMs.
*
* @param memoryManager the memory manager for execution and storage memory.
* @param reportHeartbeat the heartbeat reporting function to call.
* @param name the thread name for the heartbeater.
* @param intervalMs the interval between heartbeats.
*/
private[spark] class Heartbeater(
memoryManager: MemoryManager,
reportHeartbeat: () => Unit,
name: String,
intervalMs: Long) extends Logging {
// Executor for the heartbeat task
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)

/** Schedules a task to report a heartbeat. */
def start(): Unit = {
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartbeat())
}
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}

/** Stops the heartbeat thread. */
def stop(): Unit = {
heartbeater.shutdown()
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
}

/**
* Get the current executor level metrics. These are returned as an array, with the index
* determined by MetricGetter.values
*/
def getCurrentMetrics(): ExecutorMetrics = {
val metrics = ExecutorMetricType.values.map(_.getMetricValue(memoryManager)).toArray
new ExecutorMetrics(metrics)
}
}

20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -496,6 +497,11 @@ class SparkContext(config: SparkConf) extends Logging {
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// create and start the heartbeater for collecting memory metrics
_heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
_heartbeater.start()

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
Expand Down Expand Up @@ -1959,6 +1965,12 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
if (_heartbeater != null) {
Utils.tryLogNonFatalError {
_heartbeater.stop()
}
_heartbeater = null
}
if (env != null && _heartbeatReceiver != null) {
Utils.tryLogNonFatalError {
env.rpcEnv.stop(_heartbeatReceiver)
Expand Down Expand Up @@ -2429,6 +2441,14 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

/** Reports heartbeat metrics for the driver. */
private def reportHeartBeat(): Unit = {
val driverUpdates = _heartbeater.getCurrentMetrics()
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
Some(driverUpdates)))
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
Expand Down
36 changes: 15 additions & 21 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
Expand Down Expand Up @@ -148,7 +148,8 @@ private[spark] class Executor(
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

// Executor for the heartbeat task.
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
"executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))

// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =
Expand All @@ -167,7 +168,7 @@ private[spark] class Executor(
*/
private var heartbeatFailures = 0

startDriverHeartbeater()
heartbeater.start()

private[executor] def numRunningTasks: Int = runningTasks.size()

Expand Down Expand Up @@ -216,8 +217,12 @@ private[spark] class Executor(

def stop(): Unit = {
env.metricsSystem.report()
heartbeater.shutdown()
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
try {
heartbeater.stop()
} catch {
case NonFatal(e) =>
logWarning("Unable to stop heartbeater", e)
}
threadPool.shutdown()
if (!isLocal) {
env.stop()
Expand Down Expand Up @@ -787,6 +792,9 @@ private[spark] class Executor(
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
val curGCTime = computeTotalGcTime()

// get executor level memory metrics
val executorUpdates = heartbeater.getCurrentMetrics()

for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
Expand All @@ -795,7 +803,8 @@ private[spark] class Executor(
}
}

val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId,
executorUpdates)
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
Expand All @@ -815,21 +824,6 @@ private[spark] class Executor(
}
}
}

/**
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
*/
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")

// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
}

private[spark] object Executor {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.metrics.ExecutorMetricType

/**
* :: DeveloperApi ::
* Metrics tracked for executors and the driver.
*
* Executor-level metrics are sent from each executor to the driver as part of the Heartbeat.
*/
@DeveloperApi
class ExecutorMetrics private[spark] extends Serializable {

// Metrics are indexed by MetricGetter.values
private val metrics = new Array[Long](ExecutorMetricType.values.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why are we using an array here with index-based fetching? We could use a struct / case class to represent these metrics. But I suppose the size of the payload we send is smaller if we use an Array, and we don't want to pay serialization costs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested this earlier in the reviews. Most of the operations for dealing with this data want to iterate over all the fields. its much easier this way vs. having a bazillion

if (x.fizz > y.fizz) { 
  y.fizz = x.fizz
}
if (x.buzz > y.buzz) {
  y.buzz = x.buzz
}
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup that's fine - I did some googling, unfortunately there isn't a great way to iterate over fields of a case class. You could create a thin wrapper object around the array instead though, if we really think the nicer API is worthwhile:

case class Metrics(values: Seq[Long]) {
  def someMetric1(): Long = values(0)
  def ....
  def ...
}

Or even this:

case class Metrics(metric1: Long, metric2: Long, metfic3: Long, ...) {
  def values(): Seq[Long] = Seq(metric1, metric2, metric3, ...)
}

The latter which would be better because you'd be guaranteed to create the struct with the right number of metrics. Though such abstractions are not necessary by any means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it likely that users would want to access the individual fields, rather than iterating through all? The 1st option would be a bit nicer if so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear - if we expose these metrics to some external consumer via an API for example, then we almost certainly want to have a schema labelling these fields for consumption by e.g. dashboards. I think what we have here is fine for now.


// the first element is initialized to -1, indicating that the values for the array
// haven't been set yet.
metrics(0) = -1

/** Returns the value for the specified metricType. */
def getMetricValue(metricType: ExecutorMetricType): Long = {
metrics(ExecutorMetricType.metricIdxMap(metricType))
}

/** Returns true if the values for the metrics have been set, false otherwise. */
def isSet(): Boolean = metrics(0) > -1

private[spark] def this(metrics: Array[Long]) {
this()
Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size))
}

/**
* Constructor: create the ExecutorMetrics with the values specified.
*
* @param executorMetrics map of executor metric name to value
*/
private[spark] def this(executorMetrics: Map[String, Long]) {
this()
(0 until ExecutorMetricType.values.length).foreach { idx =>
metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
}
}

/**
* Compare the specified executor metrics values with the current executor metric values,
* and update the value for any metrics where the new value for the metric is larger.
*
* @param executorMetrics the executor metrics to compare
* @return if there is a new peak value for any metric
*/
private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = {
var updated = false

(0 until ExecutorMetricType.values.length).foreach { idx =>
if (executorMetrics.metrics(idx) > metrics(idx)) {
updated = true
metrics(idx) = executorMetrics.metrics(idx)
}
}
updated
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ package object config {
.bytesConf(ByteUnit.KiB)
.createWithDefaultString("100k")

private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS =
ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled")
.booleanConf
.createWithDefault(false)

private[spark] val EVENT_LOG_OVERWRITE =
ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)

Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,34 @@ private[spark] abstract class MemoryManager(
onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
}

/**
* On heap execution memory currently in use, in bytes.
*/
final def onHeapExecutionMemoryUsed: Long = synchronized {
onHeapExecutionMemoryPool.memoryUsed
}

/**
* Off heap execution memory currently in use, in bytes.
*/
final def offHeapExecutionMemoryUsed: Long = synchronized {
offHeapExecutionMemoryPool.memoryUsed
}

/**
* On heap storage memory currently in use, in bytes.
*/
final def onHeapStorageMemoryUsed: Long = synchronized {
onHeapStorageMemoryPool.memoryUsed
}

/**
* Off heap storage memory currently in use, in bytes.
*/
final def offHeapStorageMemoryUsed: Long = synchronized {
offHeapStorageMemoryPool.memoryUsed
}

/**
* Returns the execution memory consumption, in bytes, for the given task.
*/
Expand Down
Loading