Skip to content

Commit 9241e1e

Browse files
edwinalumccheah
authored andcommitted
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API
Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory, on/off heap unified memory, direct memory, and mapped memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction. ## What changes were proposed in this pull request? An ExecutorMetrics class is added, with jvmUsedHeapMemory, jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory, onHeapUnifiedMemory, offHeapUnifiedMemory, directMemory and mappedMemory. The new ExecutorMetrics is sent by executors to the driver as part of the Heartbeat. A heartbeat is added for the driver as well, to collect these metrics for the driver. The EventLoggingListener store information about the peak values for each metric, per active stage and executor. When a StageCompleted event is seen, a StageExecutorsMetrics event will be logged for each executor, with peak values for the stage. The AppStatusListener records the peak values for each memory metric. The new memory metrics are added to the executors REST API. ## How was this patch tested? New unit tests have been added. This was also tested on our cluster. Author: Edwina Lu <edlu@linkedin.com> Author: Imran Rashid <irashid@cloudera.com> Author: edwinalu <edwina.lu@gmail.com> Closes #21221 from edwinalu/SPARK-23429.2.
1 parent 458f501 commit 9241e1e

36 files changed

+1531
-83
lines changed

core/src/main/java/org/apache/spark/SparkFirehoseListener.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ public final void onExecutorMetricsUpdate(
103103
onEvent(executorMetricsUpdate);
104104
}
105105

106+
@Override
107+
public final void onStageExecutorMetrics(
108+
SparkListenerStageExecutorMetrics executorMetrics) {
109+
onEvent(executorMetrics);
110+
}
111+
106112
@Override
107113
public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
108114
onEvent(executorAdded);

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
2222
import scala.collection.mutable
2323
import scala.concurrent.Future
2424

25+
import org.apache.spark.executor.ExecutorMetrics
2526
import org.apache.spark.internal.Logging
2627
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
2728
import org.apache.spark.scheduler._
@@ -37,7 +38,8 @@ import org.apache.spark.util._
3738
private[spark] case class Heartbeat(
3839
executorId: String,
3940
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
40-
blockManagerId: BlockManagerId)
41+
blockManagerId: BlockManagerId,
42+
executorUpdates: ExecutorMetrics) // executor level updates
4143

4244
/**
4345
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
@@ -119,14 +121,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
119121
context.reply(true)
120122

121123
// Messages received from executors
122-
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
124+
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorMetrics) =>
123125
if (scheduler != null) {
124126
if (executorLastSeen.contains(executorId)) {
125127
executorLastSeen(executorId) = clock.getTimeMillis()
126128
eventLoopThread.submit(new Runnable {
127129
override def run(): Unit = Utils.tryLogNonFatalError {
128130
val unknownExecutor = !scheduler.executorHeartbeatReceived(
129-
executorId, accumUpdates, blockManagerId)
131+
executorId, accumUpdates, blockManagerId, executorMetrics)
130132
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
131133
context.reply(response)
132134
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.util.concurrent.TimeUnit
21+
22+
import org.apache.spark.executor.ExecutorMetrics
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.memory.MemoryManager
25+
import org.apache.spark.metrics.ExecutorMetricType
26+
import org.apache.spark.util.{ThreadUtils, Utils}
27+
28+
/**
29+
* Creates a heartbeat thread which will call the specified reportHeartbeat function at
30+
* intervals of intervalMs.
31+
*
32+
* @param memoryManager the memory manager for execution and storage memory.
33+
* @param reportHeartbeat the heartbeat reporting function to call.
34+
* @param name the thread name for the heartbeater.
35+
* @param intervalMs the interval between heartbeats.
36+
*/
37+
private[spark] class Heartbeater(
38+
memoryManager: MemoryManager,
39+
reportHeartbeat: () => Unit,
40+
name: String,
41+
intervalMs: Long) extends Logging {
42+
// Executor for the heartbeat task
43+
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)
44+
45+
/** Schedules a task to report a heartbeat. */
46+
def start(): Unit = {
47+
// Wait a random interval so the heartbeats don't end up in sync
48+
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
49+
50+
val heartbeatTask = new Runnable() {
51+
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartbeat())
52+
}
53+
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
54+
}
55+
56+
/** Stops the heartbeat thread. */
57+
def stop(): Unit = {
58+
heartbeater.shutdown()
59+
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
60+
}
61+
62+
/**
63+
* Get the current executor level metrics. These are returned as an array, with the index
64+
* determined by MetricGetter.values
65+
*/
66+
def getCurrentMetrics(): ExecutorMetrics = {
67+
val metrics = ExecutorMetricType.values.map(_.getMetricValue(memoryManager)).toArray
68+
new ExecutorMetrics(metrics)
69+
}
70+
}
71+

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ class SparkContext(config: SparkConf) extends Logging {
213213
private var _files: Seq[String] = _
214214
private var _shutdownHookRef: AnyRef = _
215215
private var _statusStore: AppStatusStore = _
216+
private var _heartbeater: Heartbeater = _
216217

217218
/* ------------------------------------------------------------------------------------- *
218219
| Accessors and public fields. These provide access to the internal state of the |
@@ -496,6 +497,11 @@ class SparkContext(config: SparkConf) extends Logging {
496497
_dagScheduler = new DAGScheduler(this)
497498
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
498499

500+
// create and start the heartbeater for collecting memory metrics
501+
_heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
502+
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
503+
_heartbeater.start()
504+
499505
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
500506
// constructor
501507
_taskScheduler.start()
@@ -1959,6 +1965,12 @@ class SparkContext(config: SparkConf) extends Logging {
19591965
Utils.tryLogNonFatalError {
19601966
_eventLogger.foreach(_.stop())
19611967
}
1968+
if (_heartbeater != null) {
1969+
Utils.tryLogNonFatalError {
1970+
_heartbeater.stop()
1971+
}
1972+
_heartbeater = null
1973+
}
19621974
if (env != null && _heartbeatReceiver != null) {
19631975
Utils.tryLogNonFatalError {
19641976
env.rpcEnv.stop(_heartbeatReceiver)
@@ -2429,6 +2441,14 @@ class SparkContext(config: SparkConf) extends Logging {
24292441
}
24302442
}
24312443

2444+
/** Reports heartbeat metrics for the driver. */
2445+
private def reportHeartBeat(): Unit = {
2446+
val driverUpdates = _heartbeater.getCurrentMetrics()
2447+
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
2448+
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
2449+
Some(driverUpdates)))
2450+
}
2451+
24322452
// In order to prevent multiple SparkContexts from being active at the same time, mark this
24332453
// context as having finished construction.
24342454
// NOTE: this must be placed at the end of the SparkContext constructor.

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
3838
import org.apache.spark.internal.config._
3939
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
4040
import org.apache.spark.rpc.RpcTimeout
41-
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
41+
import org.apache.spark.scheduler._
4242
import org.apache.spark.shuffle.FetchFailedException
4343
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
4444
import org.apache.spark.util._
@@ -148,7 +148,8 @@ private[spark] class Executor(
148148
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
149149

150150
// Executor for the heartbeat task.
151-
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
151+
private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
152+
"executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
152153

153154
// must be initialized before running startDriverHeartbeat()
154155
private val heartbeatReceiverRef =
@@ -167,7 +168,7 @@ private[spark] class Executor(
167168
*/
168169
private var heartbeatFailures = 0
169170

170-
startDriverHeartbeater()
171+
heartbeater.start()
171172

172173
private[executor] def numRunningTasks: Int = runningTasks.size()
173174

@@ -216,8 +217,12 @@ private[spark] class Executor(
216217

217218
def stop(): Unit = {
218219
env.metricsSystem.report()
219-
heartbeater.shutdown()
220-
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
220+
try {
221+
heartbeater.stop()
222+
} catch {
223+
case NonFatal(e) =>
224+
logWarning("Unable to stop heartbeater", e)
225+
}
221226
threadPool.shutdown()
222227
if (!isLocal) {
223228
env.stop()
@@ -787,6 +792,9 @@ private[spark] class Executor(
787792
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
788793
val curGCTime = computeTotalGcTime()
789794

795+
// get executor level memory metrics
796+
val executorUpdates = heartbeater.getCurrentMetrics()
797+
790798
for (taskRunner <- runningTasks.values().asScala) {
791799
if (taskRunner.task != null) {
792800
taskRunner.task.metrics.mergeShuffleReadMetrics()
@@ -795,7 +803,8 @@ private[spark] class Executor(
795803
}
796804
}
797805

798-
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
806+
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId,
807+
executorUpdates)
799808
try {
800809
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
801810
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
@@ -815,21 +824,6 @@ private[spark] class Executor(
815824
}
816825
}
817826
}
818-
819-
/**
820-
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
821-
*/
822-
private def startDriverHeartbeater(): Unit = {
823-
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
824-
825-
// Wait a random interval so the heartbeats don't end up in sync
826-
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
827-
828-
val heartbeatTask = new Runnable() {
829-
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
830-
}
831-
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
832-
}
833827
}
834828

835829
private[spark] object Executor {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.executor
18+
19+
import org.apache.spark.annotation.DeveloperApi
20+
import org.apache.spark.metrics.ExecutorMetricType
21+
22+
/**
23+
* :: DeveloperApi ::
24+
* Metrics tracked for executors and the driver.
25+
*
26+
* Executor-level metrics are sent from each executor to the driver as part of the Heartbeat.
27+
*/
28+
@DeveloperApi
29+
class ExecutorMetrics private[spark] extends Serializable {
30+
31+
// Metrics are indexed by MetricGetter.values
32+
private val metrics = new Array[Long](ExecutorMetricType.values.length)
33+
34+
// the first element is initialized to -1, indicating that the values for the array
35+
// haven't been set yet.
36+
metrics(0) = -1
37+
38+
/** Returns the value for the specified metricType. */
39+
def getMetricValue(metricType: ExecutorMetricType): Long = {
40+
metrics(ExecutorMetricType.metricIdxMap(metricType))
41+
}
42+
43+
/** Returns true if the values for the metrics have been set, false otherwise. */
44+
def isSet(): Boolean = metrics(0) > -1
45+
46+
private[spark] def this(metrics: Array[Long]) {
47+
this()
48+
Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size))
49+
}
50+
51+
/**
52+
* Constructor: create the ExecutorMetrics with the values specified.
53+
*
54+
* @param executorMetrics map of executor metric name to value
55+
*/
56+
private[spark] def this(executorMetrics: Map[String, Long]) {
57+
this()
58+
(0 until ExecutorMetricType.values.length).foreach { idx =>
59+
metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
60+
}
61+
}
62+
63+
/**
64+
* Compare the specified executor metrics values with the current executor metric values,
65+
* and update the value for any metrics where the new value for the metric is larger.
66+
*
67+
* @param executorMetrics the executor metrics to compare
68+
* @return if there is a new peak value for any metric
69+
*/
70+
private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = {
71+
var updated = false
72+
73+
(0 until ExecutorMetricType.values.length).foreach { idx =>
74+
if (executorMetrics.metrics(idx) > metrics(idx)) {
75+
updated = true
76+
metrics(idx) = executorMetrics.metrics(idx)
77+
}
78+
}
79+
updated
80+
}
81+
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ package object config {
6969
.bytesConf(ByteUnit.KiB)
7070
.createWithDefaultString("100k")
7171

72+
private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS =
73+
ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled")
74+
.booleanConf
75+
.createWithDefault(false)
76+
7277
private[spark] val EVENT_LOG_OVERWRITE =
7378
ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)
7479

core/src/main/scala/org/apache/spark/memory/MemoryManager.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,34 @@ private[spark] abstract class MemoryManager(
180180
onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
181181
}
182182

183+
/**
184+
* On heap execution memory currently in use, in bytes.
185+
*/
186+
final def onHeapExecutionMemoryUsed: Long = synchronized {
187+
onHeapExecutionMemoryPool.memoryUsed
188+
}
189+
190+
/**
191+
* Off heap execution memory currently in use, in bytes.
192+
*/
193+
final def offHeapExecutionMemoryUsed: Long = synchronized {
194+
offHeapExecutionMemoryPool.memoryUsed
195+
}
196+
197+
/**
198+
* On heap storage memory currently in use, in bytes.
199+
*/
200+
final def onHeapStorageMemoryUsed: Long = synchronized {
201+
onHeapStorageMemoryPool.memoryUsed
202+
}
203+
204+
/**
205+
* Off heap storage memory currently in use, in bytes.
206+
*/
207+
final def offHeapStorageMemoryUsed: Long = synchronized {
208+
offHeapStorageMemoryPool.memoryUsed
209+
}
210+
183211
/**
184212
* Returns the execution memory consumption, in bytes, for the given task.
185213
*/

0 commit comments

Comments
 (0)