Skip to content

Commit 577d442

Browse files
committed
Respond to some code review feedback. Make executorMetrics an Option.
1 parent f3e5704 commit 577d442

File tree

7 files changed

+57
-36
lines changed

7 files changed

+57
-36
lines changed

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
package org.apache.spark.network.netty
1919

2020
import java.nio.ByteBuffer
21+
import java.util.{List => JList}
2122

2223
import scala.collection.JavaConverters._
2324
import scala.concurrent.{Future, Promise}
2425
import scala.reflect.ClassTag
25-
import scala.tools.nsc.interpreter.JList
2626

2727
import io.netty.buffer._
2828

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ class DAGScheduler(
239239
// (taskId, stageId, stageAttemptId, accumUpdates)
240240
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
241241
blockManagerId: BlockManagerId): Boolean = {
242-
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, executorMetrics, accumUpdates))
242+
listenerBus.post(
243+
SparkListenerExecutorMetricsUpdate(execId, accumUpdates, Some(executorMetrics)))
243244
blockManagerMaster.driverEndpoint.askSync[Boolean](
244245
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
245246
}

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ private[spark] class EventLoggingListener(
8989
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
9090

9191
private val executorIdToLatestMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate]
92-
private val executorIdToModifiedMaxMetrics = new
93-
HashMap[String, SparkListenerExecutorMetricsUpdate]
92+
private val executorIdToModifiedMaxMetrics =
93+
new HashMap[String, SparkListenerExecutorMetricsUpdate]
9494

9595
/**
9696
* Creates the log file in the configured log directory.
@@ -242,7 +242,7 @@ private[spark] class EventLoggingListener(
242242
// We only track the executor metrics in each stage, so we drop the task metrics as they are
243243
// quite verbose
244244
val eventWithoutTaskMetrics = SparkListenerExecutorMetricsUpdate(
245-
event.execId, event.executorMetrics, Seq.empty)
245+
event.execId, Seq.empty, event.executorMetrics)
246246
executorIdToLatestMetrics(eventWithoutTaskMetrics.execId) = eventWithoutTaskMetrics
247247
updateModifiedMetrics(eventWithoutTaskMetrics)
248248
}
@@ -295,6 +295,9 @@ private[spark] class EventLoggingListener(
295295
* Does this event match the ID of an executor we are already tracking?
296296
* If no, start tracking metrics for this executor, starting at this event.
297297
* If yes, compare time stamps, and perhaps update using this event.
298+
* Only do this if executorMetrics is present in the toBeModifiedEvent.
299+
* If it is not - meaning we are processing historical data created
300+
* without executorMetrics - simply cache the latestEvent
298301
* @param latestEvent the latest event received, used to update our map of stored metrics.
299302
*/
300303
private def updateModifiedMetrics(latestEvent: SparkListenerExecutorMetricsUpdate): Unit = {
@@ -304,30 +307,36 @@ private[spark] class EventLoggingListener(
304307
case None =>
305308
executorIdToModifiedMaxMetrics(executorId) = latestEvent
306309
case Some(toBeModifiedEvent) =>
307-
val toBeModifiedTransportMetrics = toBeModifiedEvent.executorMetrics.transportMetrics
308-
val latestTransportMetrics = latestEvent.executorMetrics.transportMetrics
309-
var timeStamp: Long = toBeModifiedTransportMetrics.timeStamp
310-
311-
val onHeapSize = if
312-
(latestTransportMetrics.onHeapSize > toBeModifiedTransportMetrics.onHeapSize) {
313-
timeStamp = latestTransportMetrics.timeStamp
314-
latestTransportMetrics.onHeapSize
315-
} else {
316-
toBeModifiedTransportMetrics.onHeapSize
310+
if (toBeModifiedEvent.executorMetrics.isEmpty ||
311+
latestEvent.executorMetrics.isEmpty) {
312+
executorIdToModifiedMaxMetrics(executorId) == latestEvent
317313
}
318-
val offHeapSize =
319-
if (latestTransportMetrics.offHeapSize > toBeModifiedTransportMetrics.offHeapSize) {
320-
timeStamp = latestTransportMetrics.timeStamp
321-
latestTransportMetrics.offHeapSize
322-
} else {
323-
toBeModifiedTransportMetrics.offHeapSize
314+
else {
315+
val prevTransportMetrics = toBeModifiedEvent.executorMetrics.get.transportMetrics
316+
val latestTransportMetrics = latestEvent.executorMetrics.get.transportMetrics
317+
var timeStamp: Long = prevTransportMetrics.timeStamp
318+
319+
val onHeapSize = if
320+
(latestTransportMetrics.onHeapSize > prevTransportMetrics.onHeapSize) {
321+
timeStamp = latestTransportMetrics.timeStamp
322+
latestTransportMetrics.onHeapSize
323+
} else {
324+
prevTransportMetrics.onHeapSize
325+
}
326+
val offHeapSize =
327+
if (latestTransportMetrics.offHeapSize > prevTransportMetrics.offHeapSize) {
328+
timeStamp = latestTransportMetrics.timeStamp
329+
latestTransportMetrics.offHeapSize
330+
} else {
331+
prevTransportMetrics.offHeapSize
332+
}
333+
val updatedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.get.hostname,
334+
toBeModifiedEvent.executorMetrics.get.port,
335+
TransportMetrics(timeStamp, onHeapSize, offHeapSize))
336+
val modifiedEvent = SparkListenerExecutorMetricsUpdate(
337+
toBeModifiedEvent.execId, toBeModifiedEvent.accumUpdates, Some(updatedExecMetrics))
338+
executorIdToModifiedMaxMetrics(executorId) = modifiedEvent
324339
}
325-
val modifiedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.hostname,
326-
toBeModifiedEvent.executorMetrics.port,
327-
TransportMetrics(timeStamp, onHeapSize, offHeapSize))
328-
val modifiedEvent = SparkListenerExecutorMetricsUpdate(
329-
toBeModifiedEvent.execId, modifiedExecMetrics, toBeModifiedEvent.accumUpdates)
330-
executorIdToModifiedMaxMetrics(executorId) = modifiedEvent
331340
}
332341
}
333342
}

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,14 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
138138
/**
139139
* Periodic updates from executors.
140140
* @param execId executor id
141-
* @param executorMetrics metrics in executor level
142141
* @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates)
142+
* @param executorMetrics keeps track of TransportMetrics for an executor Added in Spark 2.3.
143143
*/
144144
@DeveloperApi
145145
case class SparkListenerExecutorMetricsUpdate(
146146
execId: String,
147-
executorMetrics: ExecutorMetrics,
148-
accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
147+
accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
148+
executorMetrics: Option[ExecutorMetrics])
149149
extends SparkListenerEvent
150150

151151
@DeveloperApi

core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,16 @@ class MemoryListener extends SparkListener {
5757
val executorMetrics = event.executorMetrics
5858
activeExecutorIdToMem
5959
.getOrElseUpdate(executorId, new MemoryUIInfo)
60-
.updateMemUiInfo(executorMetrics)
60+
.updateMemUiInfo(executorMetrics.get)
6161
activeStagesToMem.foreach { case (_, stageMemMetrics) =>
6262
// If executor is added in the stage running time, we also update the metrics for the
6363
// executor in {{activeStagesToMem}}
6464
if (!stageMemMetrics.contains(executorId)) {
6565
stageMemMetrics(executorId) = new MemoryUIInfo
6666
}
67-
stageMemMetrics(executorId).updateMemUiInfo(executorMetrics)
67+
stageMemMetrics(executorId).updateMemUiInfo(executorMetrics.get)
6868
}
69-
latestExecIdToExecMetrics(executorId) = executorMetrics
69+
latestExecIdToExecMetrics(executorId) = executorMetrics.get
7070
}
7171

7272
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,13 @@ private[spark] object JsonProtocol {
237237
val execId = metricsUpdate.execId
238238
val executorMetrics = metricsUpdate.executorMetrics
239239
val accumUpdates = metricsUpdate.accumUpdates
240+
val metricsJson: JValue = executorMetrics match {
241+
case Some(metrics) => executorMetricsToJson(metrics)
242+
case None => "none"
243+
}
240244
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
241245
("Executor ID" -> execId) ~
242-
("Executor Metrics Updated" -> executorMetricsToJson(executorMetrics)) ~
246+
("Executor Metrics Updated" -> metricsJson) ~
243247
("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
244248
("Task ID" -> taskId) ~
245249
("Stage ID" -> stageId) ~
@@ -688,7 +692,7 @@ private[spark] object JsonProtocol {
688692
(json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
689693
(taskId, stageId, stageAttemptId, updates)
690694
}
691-
SparkListenerExecutorMetricsUpdate(execInfo, executorMetrics, accumUpdates)
695+
SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, Some(executorMetrics))
692696
}
693697

694698
/** --------------------------------------------------------------------- *

project/MimaExcludes.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,14 @@ object MimaExcludes {
108108
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerBlockManagerAdded.apply"),
109109
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.StorageStatus.this"),
110110
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.this"),
111-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this")
111+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this"),
112+
113+
// [SPARK-9103] Update SparkListenerExecutorMetricsUpdate with new executorMetrics
114+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"),
115+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"),
116+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"),
117+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$"),
118+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy$default$2")
112119
)
113120

114121
// Exclude rules for 2.1.x

0 commit comments

Comments
 (0)