Skip to content

Commit 80ab19b

Browse files
wypoonsquito
authored andcommitted
[SPARK-26329][CORE] Faster polling of executor memory metrics.
## What changes were proposed in this pull request? Prior to this change, in an executor, on each heartbeat, memory metrics are polled and sent in the heartbeat. The heartbeat interval is 10s by default. With this change, in an executor, memory metrics can optionally be polled in a separate poller at a shorter interval. For each executor, we use a map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) to track what stages are active as well as the per-stage memory metric peaks. When polling the executor memory metrics, we attribute the memory to the active stage(s), and update the peaks. In a heartbeat, we send the per-stage peaks (for stages active at that time), and then reset the peaks. The semantics would be that the per-stage peaks sent in each heartbeat are the peaks since the last heartbeat. We also keep a map of taskId to memory metric peaks. This tracks the metric peaks during the lifetime of the task. The polling thread updates this as well. At end of a task, we send the peak metric values in the task result. In case of task failure, we send the peak metric values in the `TaskFailedReason`. We continue to do the stage-level aggregation in the EventLoggingListener. For the driver, we still only poll on heartbeats. What the driver sends will be the current values of the metrics in the driver at the time of the heartbeat. This is semantically the same as before. ## How was this patch tested? Unit tests. Manually tested applications on an actual system and checked the event logs; the metrics appear in the SparkListenerTaskEnd and SparkListenerStageExecutorMetrics events. Closes #23767 from wypoon/wypoon_SPARK-26329. Authored-by: Wing Yew Poon <wypoon@cloudera.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
1 parent 26d03b6 commit 80ab19b

File tree

49 files changed

+1205
-1484
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1205
-1484
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark
1919

2020
import java.util.concurrent.{ScheduledFuture, TimeUnit}
2121

22-
import scala.collection.mutable
22+
import scala.collection.mutable.{HashMap, Map}
2323
import scala.concurrent.Future
2424

2525
import org.apache.spark.executor.ExecutorMetrics
@@ -38,9 +38,11 @@ import org.apache.spark.util._
3838
*/
3939
private[spark] case class Heartbeat(
4040
executorId: String,
41-
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
41+
// taskId -> accumulator updates
42+
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
4243
blockManagerId: BlockManagerId,
43-
executorUpdates: ExecutorMetrics) // executor level updates
44+
// (stageId, stageAttemptId) -> executor metric peaks
45+
executorUpdates: Map[(Int, Int), ExecutorMetrics])
4446

4547
/**
4648
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
@@ -73,7 +75,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
7375
private[spark] var scheduler: TaskScheduler = null
7476

7577
// executor ID -> timestamp of when the last heartbeat from this executor was received
76-
private val executorLastSeen = new mutable.HashMap[String, Long]
78+
private val executorLastSeen = new HashMap[String, Long]
7779

7880
private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)
7981

@@ -120,14 +122,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
120122
context.reply(true)
121123

122124
// Messages received from executors
123-
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorMetrics) =>
125+
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorUpdates) =>
124126
if (scheduler != null) {
125127
if (executorLastSeen.contains(executorId)) {
126128
executorLastSeen(executorId) = clock.getTimeMillis()
127129
eventLoopThread.submit(new Runnable {
128130
override def run(): Unit = Utils.tryLogNonFatalError {
129131
val unknownExecutor = !scheduler.executorHeartbeatReceived(
130-
executorId, accumUpdates, blockManagerId, executorMetrics)
132+
executorId, accumUpdates, blockManagerId, executorUpdates)
131133
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
132134
context.reply(response)
133135
}

core/src/main/scala/org/apache/spark/Heartbeater.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,18 @@ package org.apache.spark
1919

2020
import java.util.concurrent.TimeUnit
2121

22-
import org.apache.spark.executor.ExecutorMetrics
2322
import org.apache.spark.internal.Logging
24-
import org.apache.spark.memory.MemoryManager
25-
import org.apache.spark.metrics.ExecutorMetricType
2623
import org.apache.spark.util.{ThreadUtils, Utils}
2724

2825
/**
2926
* Creates a heartbeat thread which will call the specified reportHeartbeat function at
3027
* intervals of intervalMs.
3128
*
32-
* @param memoryManager the memory manager for execution and storage memory.
3329
* @param reportHeartbeat the heartbeat reporting function to call.
3430
* @param name the thread name for the heartbeater.
3531
* @param intervalMs the interval between heartbeats.
3632
*/
3733
private[spark] class Heartbeater(
38-
memoryManager: MemoryManager,
3934
reportHeartbeat: () => Unit,
4035
name: String,
4136
intervalMs: Long) extends Logging {
@@ -58,21 +53,4 @@ private[spark] class Heartbeater(
5853
heartbeater.shutdown()
5954
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
6055
}
61-
62-
/**
63-
* Get the current executor level metrics. These are returned as an array, with the index
64-
* determined by ExecutorMetricType.metricToOffset
65-
*/
66-
def getCurrentMetrics(): ExecutorMetrics = {
67-
68-
val metrics = new Array[Long](ExecutorMetricType.numMetrics)
69-
var offset = 0
70-
ExecutorMetricType.metricGetters.foreach { metric =>
71-
val newMetrics = metric.getMetricValues(memoryManager)
72-
Array.copy(newMetrics, 0, metrics, offset, newMetrics.size)
73-
offset += newMetrics.length
74-
}
75-
new ExecutorMetrics(metrics)
76-
}
7756
}
78-

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
4242
import org.apache.spark.annotation.DeveloperApi
4343
import org.apache.spark.broadcast.Broadcast
4444
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
45+
import org.apache.spark.executor.ExecutorMetrics
4546
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
4647
import org.apache.spark.internal.Logging
4748
import org.apache.spark.internal.config._
@@ -516,7 +517,7 @@ class SparkContext(config: SparkConf) extends Logging {
516517
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
517518

518519
// create and start the heartbeater for collecting memory metrics
519-
_heartbeater = new Heartbeater(env.memoryManager,
520+
_heartbeater = new Heartbeater(
520521
() => SparkContext.this.reportHeartBeat(),
521522
"driver-heartbeater",
522523
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
@@ -2425,10 +2426,13 @@ class SparkContext(config: SparkConf) extends Logging {
24252426

24262427
/** Reports heartbeat metrics for the driver. */
24272428
private def reportHeartBeat(): Unit = {
2428-
val driverUpdates = _heartbeater.getCurrentMetrics()
2429+
val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager)
2430+
val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics]
2431+
// In the driver, we do not track per-stage metrics, so use a dummy stage for the key
2432+
driverUpdates.put(EventLoggingListener.DRIVER_STAGE_KEY, new ExecutorMetrics(currentMetrics))
24292433
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
24302434
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
2431-
Some(driverUpdates)))
2435+
driverUpdates))
24322436
}
24332437

24342438
// In order to prevent multiple SparkContexts from being active at the same time, mark this

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ case class ExceptionFailure(
128128
fullStackTrace: String,
129129
private val exceptionWrapper: Option[ThrowableSerializationWrapper],
130130
accumUpdates: Seq[AccumulableInfo] = Seq.empty,
131-
private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil)
131+
private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil,
132+
private[spark] var metricPeaks: Seq[Long] = Seq.empty)
132133
extends TaskFailedReason {
133134

134135
/**
@@ -153,6 +154,11 @@ case class ExceptionFailure(
153154
this
154155
}
155156

157+
private[spark] def withMetricPeaks(metricPeaks: Seq[Long]): ExceptionFailure = {
158+
this.metricPeaks = metricPeaks
159+
this
160+
}
161+
156162
def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception))
157163

158164
override def toErrorString: String =
@@ -215,7 +221,8 @@ case object TaskResultLost extends TaskFailedReason {
215221
case class TaskKilled(
216222
reason: String,
217223
accumUpdates: Seq[AccumulableInfo] = Seq.empty,
218-
private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil)
224+
private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil,
225+
metricPeaks: Seq[Long] = Seq.empty)
219226
extends TaskFailedReason {
220227

221228
override def toErrorString: String = s"TaskKilled ($reason)"

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import java.util.concurrent._
2727
import javax.annotation.concurrent.GuardedBy
2828

2929
import scala.collection.JavaConverters._
30-
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
30+
import scala.collection.mutable.{ArrayBuffer, HashMap, Map, WrappedArray}
3131
import scala.concurrent.duration._
3232
import scala.util.control.NonFatal
3333

@@ -188,9 +188,20 @@ private[spark] class Executor(
188188
*/
189189
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
190190

191+
/**
192+
* Interval to poll for executor metrics, in milliseconds
193+
*/
194+
private val METRICS_POLLING_INTERVAL_MS = conf.get(EXECUTOR_METRICS_POLLING_INTERVAL)
195+
196+
private val pollOnHeartbeat = if (METRICS_POLLING_INTERVAL_MS > 0) false else true
197+
198+
// Poller for the memory metrics. Visible for testing.
199+
private[executor] val metricsPoller = new ExecutorMetricsPoller(
200+
env.memoryManager,
201+
METRICS_POLLING_INTERVAL_MS)
202+
191203
// Executor for the heartbeat task.
192204
private val heartbeater = new Heartbeater(
193-
env.memoryManager,
194205
() => Executor.this.reportHeartBeat(),
195206
"executor-heartbeater",
196207
HEARTBEAT_INTERVAL_MS)
@@ -207,6 +218,8 @@ private[spark] class Executor(
207218

208219
heartbeater.start()
209220

221+
metricsPoller.start()
222+
210223
private[executor] def numRunningTasks: Int = runningTasks.size()
211224

212225
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
@@ -254,12 +267,18 @@ private[spark] class Executor(
254267

255268
def stop(): Unit = {
256269
env.metricsSystem.report()
270+
try {
271+
metricsPoller.stop()
272+
} catch {
273+
case NonFatal(e) =>
274+
logWarning("Unable to stop executor metrics poller", e)
275+
}
257276
try {
258277
heartbeater.stop()
259278
} catch {
260279
case NonFatal(e) =>
261280
logWarning("Unable to stop heartbeater", e)
262-
}
281+
}
263282
threadPool.shutdown()
264283

265284
// Notify plugins that executor is shutting down so they can terminate cleanly
@@ -380,6 +399,7 @@ private[spark] class Executor(
380399
var taskStartTimeNs: Long = 0
381400
var taskStartCpu: Long = 0
382401
startGCTime = computeTotalGcTime()
402+
var taskStarted: Boolean = false
383403

384404
try {
385405
// Must be set before updateDependencies() is called, in case fetching dependencies
@@ -412,6 +432,9 @@ private[spark] class Executor(
412432
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
413433
}
414434

435+
metricsPoller.onTaskStart(taskId, task.stageId, task.stageAttemptId)
436+
taskStarted = true
437+
415438
// Run the actual task and measure its runtime.
416439
taskStartTimeNs = System.nanoTime()
417440
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
@@ -529,8 +552,9 @@ private[spark] class Executor(
529552

530553
// Note: accumulator updates must be collected after TaskMetrics is updated
531554
val accumUpdates = task.collectAccumulatorUpdates()
555+
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
532556
// TODO: do not serialize value twice
533-
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
557+
val directResult = new DirectTaskResult(valueBytes, accumUpdates, metricPeaks)
534558
val serializedDirectResult = ser.serialize(directResult)
535559
val resultSize = serializedDirectResult.limit()
536560

@@ -559,13 +583,15 @@ private[spark] class Executor(
559583
executorSource.SUCCEEDED_TASKS.inc(1L)
560584
setTaskFinishedAndClearInterruptStatus()
561585
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
562-
563586
} catch {
564587
case t: TaskKilledException =>
565588
logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}")
566589

567590
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
568-
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums))
591+
// Here and below, put task metric peaks in a WrappedArray to expose them as a Seq
592+
// without requiring a copy.
593+
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
594+
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums, metricPeaks))
569595
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)
570596

571597
case _: InterruptedException | NonFatal(_) if
@@ -574,7 +600,8 @@ private[spark] class Executor(
574600
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
575601

576602
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
577-
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums))
603+
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
604+
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums, metricPeaks))
578605
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)
579606

580607
case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
@@ -609,14 +636,19 @@ private[spark] class Executor(
609636
// instead of an app issue).
610637
if (!ShutdownHookManager.inShutdown()) {
611638
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
639+
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
612640

613641
val serializedTaskEndReason = {
614642
try {
615-
ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
643+
val ef = new ExceptionFailure(t, accUpdates).withAccums(accums)
644+
.withMetricPeaks(metricPeaks)
645+
ser.serialize(ef)
616646
} catch {
617647
case _: NotSerializableException =>
618648
// t is not serializable so just send the stacktrace
619-
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
649+
val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums)
650+
.withMetricPeaks(metricPeaks)
651+
ser.serialize(ef)
620652
}
621653
}
622654
setTaskFinishedAndClearInterruptStatus()
@@ -632,6 +664,11 @@ private[spark] class Executor(
632664
}
633665
} finally {
634666
runningTasks.remove(taskId)
667+
if (taskStarted) {
668+
// This means the task was successfully deserialized, its stageId and stageAttemptId
669+
// are known, and metricsPoller.onTaskStart was called.
670+
metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId)
671+
}
635672
}
636673
}
637674

@@ -848,8 +885,11 @@ private[spark] class Executor(
848885
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
849886
val curGCTime = computeTotalGcTime()
850887

851-
// get executor level memory metrics
852-
val executorUpdates = heartbeater.getCurrentMetrics()
888+
if (pollOnHeartbeat) {
889+
metricsPoller.poll()
890+
}
891+
892+
val executorUpdates = metricsPoller.getExecutorUpdates()
853893

854894
for (taskRunner <- runningTasks.values().asScala) {
855895
if (taskRunner.task != null) {

core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
*/
1717
package org.apache.spark.executor
1818

19+
import java.util.concurrent.atomic.AtomicLongArray
20+
1921
import org.apache.spark.annotation.DeveloperApi
22+
import org.apache.spark.memory.MemoryManager
2023
import org.apache.spark.metrics.ExecutorMetricType
2124

2225
/**
@@ -46,14 +49,21 @@ class ExecutorMetrics private[spark] extends Serializable {
4649
Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size))
4750
}
4851

52+
private[spark] def this(metrics: AtomicLongArray) {
53+
this()
54+
ExecutorMetricType.metricToOffset.foreach { case (_, i) =>
55+
this.metrics(i) = metrics.get(i)
56+
}
57+
}
58+
4959
/**
50-
* Constructor: create the ExecutorMetrics with using a given map.
60+
* Constructor: create the ExecutorMetrics using a given map.
5161
*
5262
* @param executorMetrics map of executor metric name to value
5363
*/
5464
private[spark] def this(executorMetrics: Map[String, Long]) {
5565
this()
56-
ExecutorMetricType.metricToOffset.foreach { case(name, idx) =>
66+
ExecutorMetricType.metricToOffset.foreach { case (name, idx) =>
5767
metrics(idx) = executorMetrics.getOrElse(name, 0L)
5868
}
5969
}
@@ -76,3 +86,24 @@ class ExecutorMetrics private[spark] extends Serializable {
7686
updated
7787
}
7888
}
89+
90+
private[spark] object ExecutorMetrics {
91+
92+
/**
93+
* Get the current executor metrics. These are returned as an array, with the index
94+
* determined by ExecutorMetricType.metricToOffset.
95+
*
96+
* @param memoryManager the memory manager for execution and storage memory
97+
* @return the values of the metrics
98+
*/
99+
def getCurrentMetrics(memoryManager: MemoryManager): Array[Long] = {
100+
val currentMetrics = new Array[Long](ExecutorMetricType.numMetrics)
101+
var offset = 0
102+
ExecutorMetricType.metricGetters.foreach { metricType =>
103+
val metricValues = metricType.getMetricValues(memoryManager)
104+
Array.copy(metricValues, 0, currentMetrics, offset, metricValues.length)
105+
offset += metricValues.length
106+
}
107+
currentMetrics
108+
}
109+
}

0 commit comments

Comments
 (0)