Skip to content

[SPARK-26329][CORE] Faster polling of executor memory metrics. #23767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9f5b195
[SPARK-26329][CORE] Faster polling of executor memory metrics.
wypoon Jan 4, 2019
03e41a8
[SPARK-26329][CORE] Fix test compilation error in sql.
wypoon Feb 12, 2019
7f6bd74
[SPARK-26329][CORE] Fix Mima issues.
wypoon Feb 13, 2019
7397897
[SPARK-26329][CORE] Fix possible NPE.
wypoon Feb 13, 2019
e1aeafc
[SPARK-26329][CORE] Fix JsonProtocolSuite post-rebase to account for …
wypoon Mar 6, 2019
75ba39d
[SPARK-26329][CORE] Extract polling logic into a separate class.
wypoon Mar 8, 2019
ea2ff0d
[SPARK-26329][CORE] On task failure, send executor metrics in the Tas…
wypoon Mar 18, 2019
0cbfc04
[SPARK-26329][CORE] Unit tests for sending executor metrics in TaskRe…
wypoon Mar 21, 2019
8cb30a8
[SPARK-26329][CORE] Add driver updates to test for executor metrics a…
wypoon Mar 22, 2019
077abb0
[SPARK-26329][CORE] Add SparkListenerTaskEnd events to test for execu…
wypoon Mar 23, 2019
7a3c90d
[SPARK-26329][CORE] Address feedback from irashid.
wypoon Mar 27, 2019
3ed583a
[SPARK-26329][CORE] Fix ExecutorSuite failures.
wypoon Mar 28, 2019
0a4828a
[SPARK-26329][CORE] Delete a comment on irashid's suggestion.
wypoon Mar 28, 2019
9530b75
[SPARK-26329][CORE] Change executorUpdates to be a scala.collection.m…
wypoon Mar 28, 2019
38a397c
[SPARK-26329][CORE] Update HistoryServerSuite.
wypoon Apr 9, 2019
e062e60
[SPARK-26329][CORE] Get executor updates and reset the peaks in a sin…
wypoon May 15, 2019
20b4b7e
[SPARK-26329][CORE] Test fixes after rebase on master.
wypoon Jul 3, 2019
b898ad2
[SPARK-26329][CORE] Adopt some suggestions from attilapiros.
wypoon Jul 4, 2019
fbb55bf
[SPARK-26329][CORE] Address feedback from Imran Rashid.
wypoon Jul 19, 2019
99addf1
[SPARK-26329][CORE] Make TCMP case class private.
wypoon Jul 19, 2019
7331b27
[SPARK-26329][CORE] Fix a test post-rebase.
wypoon Jul 29, 2019
7556d6a
[SPARK-26329][CORE] Update a doc comment based on feedback from Imran…
wypoon Jul 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.collection.mutable
import scala.collection.mutable.{HashMap, Map}
import scala.concurrent.Future

import org.apache.spark.executor.ExecutorMetrics
Expand All @@ -38,9 +38,11 @@ import org.apache.spark.util._
*/
private[spark] case class Heartbeat(
executorId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
// taskId -> accumulator updates
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorUpdates: ExecutorMetrics) // executor level updates
// (stageId, stageAttemptId) -> executor metric peaks
executorUpdates: Map[(Int, Int), ExecutorMetrics])

/**
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
Expand Down Expand Up @@ -73,7 +75,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
private[spark] var scheduler: TaskScheduler = null

// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]
private val executorLastSeen = new HashMap[String, Long]

private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)

Expand Down Expand Up @@ -120,14 +122,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorMetrics) =>
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorUpdates) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, accumUpdates, blockManagerId, executorMetrics)
executorId, accumUpdates, blockManagerId, executorUpdates)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
Expand Down
22 changes: 0 additions & 22 deletions core/src/main/scala/org/apache/spark/Heartbeater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,18 @@ package org.apache.spark

import java.util.concurrent.TimeUnit

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Creates a heartbeat thread which will call the specified reportHeartbeat function at
* intervals of intervalMs.
*
* @param memoryManager the memory manager for execution and storage memory.
* @param reportHeartbeat the heartbeat reporting function to call.
* @param name the thread name for the heartbeater.
* @param intervalMs the interval between heartbeats.
*/
private[spark] class Heartbeater(
memoryManager: MemoryManager,
reportHeartbeat: () => Unit,
name: String,
intervalMs: Long) extends Logging {
Expand All @@ -58,21 +53,4 @@ private[spark] class Heartbeater(
heartbeater.shutdown()
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
}

/**
* Get the current executor level metrics. These are returned as an array, with the index
* determined by ExecutorMetricType.metricToOffset
*/
def getCurrentMetrics(): ExecutorMetrics = {

val metrics = new Array[Long](ExecutorMetricType.numMetrics)
var offset = 0
ExecutorMetricType.metricGetters.foreach { metric =>
val newMetrics = metric.getMetricValues(memoryManager)
Array.copy(newMetrics, 0, metrics, offset, newMetrics.size)
offset += newMetrics.length
}
new ExecutorMetrics(metrics)
}
}

10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -516,7 +517,7 @@ class SparkContext(config: SparkConf) extends Logging {
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// create and start the heartbeater for collecting memory metrics
_heartbeater = new Heartbeater(env.memoryManager,
_heartbeater = new Heartbeater(
() => SparkContext.this.reportHeartBeat(),
"driver-heartbeater",
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
Expand Down Expand Up @@ -2425,10 +2426,13 @@ class SparkContext(config: SparkConf) extends Logging {

/** Reports heartbeat metrics for the driver. */
private def reportHeartBeat(): Unit = {
val driverUpdates = _heartbeater.getCurrentMetrics()
val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager)
val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics]
// In the driver, we do not track per-stage metrics, so use a dummy stage for the key
driverUpdates.put(EventLoggingListener.DRIVER_STAGE_KEY, new ExecutorMetrics(currentMetrics))
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
Some(driverUpdates)))
driverUpdates))
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ case class ExceptionFailure(
fullStackTrace: String,
private val exceptionWrapper: Option[ThrowableSerializationWrapper],
accumUpdates: Seq[AccumulableInfo] = Seq.empty,
private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil)
private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil,
private[spark] var metricPeaks: Seq[Long] = Seq.empty)
extends TaskFailedReason {

/**
Expand All @@ -153,6 +154,11 @@ case class ExceptionFailure(
this
}

private[spark] def withMetricPeaks(metricPeaks: Seq[Long]): ExceptionFailure = {
this.metricPeaks = metricPeaks
this
}

def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception))

override def toErrorString: String =
Expand Down Expand Up @@ -215,7 +221,8 @@ case object TaskResultLost extends TaskFailedReason {
case class TaskKilled(
reason: String,
accumUpdates: Seq[AccumulableInfo] = Seq.empty,
private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil)
private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil,
metricPeaks: Seq[Long] = Seq.empty)
extends TaskFailedReason {

override def toErrorString: String = s"TaskKilled ($reason)"
Expand Down
62 changes: 51 additions & 11 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.concurrent._
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.collection.mutable.{ArrayBuffer, HashMap, Map, WrappedArray}
import scala.concurrent.duration._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -188,9 +188,20 @@ private[spark] class Executor(
*/
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)

/**
* Interval to poll for executor metrics, in milliseconds
*/
private val METRICS_POLLING_INTERVAL_MS = conf.get(EXECUTOR_METRICS_POLLING_INTERVAL)

private val pollOnHeartbeat = if (METRICS_POLLING_INTERVAL_MS > 0) false else true

// Poller for the memory metrics. Visible for testing.
private[executor] val metricsPoller = new ExecutorMetricsPoller(
env.memoryManager,
METRICS_POLLING_INTERVAL_MS)

// Executor for the heartbeat task.
private val heartbeater = new Heartbeater(
env.memoryManager,
() => Executor.this.reportHeartBeat(),
"executor-heartbeater",
HEARTBEAT_INTERVAL_MS)
Expand All @@ -207,6 +218,8 @@ private[spark] class Executor(

heartbeater.start()

metricsPoller.start()

private[executor] def numRunningTasks: Int = runningTasks.size()

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
Expand Down Expand Up @@ -254,12 +267,18 @@ private[spark] class Executor(

def stop(): Unit = {
env.metricsSystem.report()
try {
metricsPoller.stop()
} catch {
case NonFatal(e) =>
logWarning("Unable to stop executor metrics poller", e)
}
try {
heartbeater.stop()
} catch {
case NonFatal(e) =>
logWarning("Unable to stop heartbeater", e)
}
}
threadPool.shutdown()

// Notify plugins that executor is shutting down so they can terminate cleanly
Expand Down Expand Up @@ -380,6 +399,7 @@ private[spark] class Executor(
var taskStartTimeNs: Long = 0
var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()
var taskStarted: Boolean = false

try {
// Must be set before updateDependencies() is called, in case fetching dependencies
Expand Down Expand Up @@ -412,6 +432,9 @@ private[spark] class Executor(
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
}

metricsPoller.onTaskStart(taskId, task.stageId, task.stageAttemptId)
taskStarted = true

// Run the actual task and measure its runtime.
taskStartTimeNs = System.nanoTime()
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
Expand Down Expand Up @@ -529,8 +552,9 @@ private[spark] class Executor(

// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val directResult = new DirectTaskResult(valueBytes, accumUpdates, metricPeaks)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit()

Expand Down Expand Up @@ -559,13 +583,15 @@ private[spark] class Executor(
executorSource.SUCCEEDED_TASKS.inc(1L)
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

} catch {
case t: TaskKilledException =>
logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}")

val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums))
// Here and below, put task metric peaks in a WrappedArray to expose them as a Seq
// without requiring a copy.
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums, metricPeaks))
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)

case _: InterruptedException | NonFatal(_) if
Expand All @@ -574,7 +600,8 @@ private[spark] class Executor(
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")

val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums))
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums, metricPeaks))
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)

case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wypoon Is there any special reason that metricPeaks is not send in this case and CommitDeniedException?

Copy link
Contributor Author

@wypoon wypoon Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no special reason. I saw that in some cases, accumulators are collected and sent in the TaskFailedReason, and in those cases I send the task metric peaks as well; in the other cases, accumulators are not collected so I don't send the task metric peaks either.

Expand Down Expand Up @@ -609,14 +636,19 @@ private[spark] class Executor(
// instead of an app issue).
if (!ShutdownHookManager.inShutdown()) {
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))

val serializedTaskEndReason = {
try {
ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
val ef = new ExceptionFailure(t, accUpdates).withAccums(accums)
.withMetricPeaks(metricPeaks)
ser.serialize(ef)
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums)
.withMetricPeaks(metricPeaks)
ser.serialize(ef)
}
}
setTaskFinishedAndClearInterruptStatus()
Expand All @@ -632,6 +664,11 @@ private[spark] class Executor(
}
} finally {
runningTasks.remove(taskId)
if (taskStarted) {
// This means the task was successfully deserialized, its stageId and stageAttemptId
// are known, and metricsPoller.onTaskStart was called.
metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId)
}
}
}

Expand Down Expand Up @@ -848,8 +885,11 @@ private[spark] class Executor(
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
val curGCTime = computeTotalGcTime()

// get executor level memory metrics
val executorUpdates = heartbeater.getCurrentMetrics()
if (pollOnHeartbeat) {
metricsPoller.poll()
}

val executorUpdates = metricsPoller.getExecutorUpdates()

for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.spark.executor

import java.util.concurrent.atomic.AtomicLongArray

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.ExecutorMetricType

/**
Expand Down Expand Up @@ -46,14 +49,21 @@ class ExecutorMetrics private[spark] extends Serializable {
Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size))
}

private[spark] def this(metrics: AtomicLongArray) {
this()
ExecutorMetricType.metricToOffset.foreach { case (_, i) =>
this.metrics(i) = metrics.get(i)
}
}

/**
* Constructor: create the ExecutorMetrics with using a given map.
* Constructor: create the ExecutorMetrics using a given map.
*
* @param executorMetrics map of executor metric name to value
*/
private[spark] def this(executorMetrics: Map[String, Long]) {
this()
ExecutorMetricType.metricToOffset.foreach { case(name, idx) =>
ExecutorMetricType.metricToOffset.foreach { case (name, idx) =>
metrics(idx) = executorMetrics.getOrElse(name, 0L)
}
}
Expand All @@ -76,3 +86,24 @@ class ExecutorMetrics private[spark] extends Serializable {
updated
}
}

private[spark] object ExecutorMetrics {

/**
* Get the current executor metrics. These are returned as an array, with the index
* determined by ExecutorMetricType.metricToOffset.
*
* @param memoryManager the memory manager for execution and storage memory
* @return the values of the metrics
*/
def getCurrentMetrics(memoryManager: MemoryManager): Array[Long] = {
val currentMetrics = new Array[Long](ExecutorMetricType.numMetrics)
var offset = 0
ExecutorMetricType.metricGetters.foreach { metricType =>
val metricValues = metricType.getMetricValues(memoryManager)
Array.copy(metricValues, 0, currentMetrics, offset, metricValues.length)
offset += metricValues.length
}
currentMetrics
}
}
Loading