Skip to content

Commit 754b5b8

Browse files
committed
copy taskMetrics only when isLocal is true
1 parent 5ca26dc commit 754b5b8

File tree

3 files changed

+16
-17
lines changed

3 files changed

+16
-17
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,13 @@ private[spark] class Executor(
360360
if (!taskRunner.attemptedTask.isEmpty) {
361361
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
362362
metrics.updateShuffleReadMetrics
363-
tasksMetrics += ((taskRunner.taskId, metrics))
363+
if (isLocal) {
364+
// make a deep copy of it
365+
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
366+
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
367+
} else {
368+
tasksMetrics += ((taskRunner.taskId, metrics))
369+
}
364370
}
365371
}
366372
}

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.scheduler._
2626
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
2727
import org.apache.spark.storage.BlockManagerId
2828
import org.apache.spark.ui.jobs.UIData._
29-
import org.apache.spark.util.Utils
3029

3130
/**
3231
* :: DeveloperApi ::
@@ -242,9 +241,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
242241
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics,
243242
t.taskMetrics)
244243

245-
// Overwrite task metrics with deepcopy
246-
// TODO: only serialize it in local mode
247-
t.taskMetrics = Some(Utils.deserialize[TaskMetrics](Utils.serialize(taskMetrics)))
244+
// Overwrite task metrics
245+
t.taskMetrics = Some(taskMetrics)
248246
}
249247
}
250248
}

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
144144
val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0))
145145
val execId = "exe-1"
146146

147-
def updateTaskMetrics(taskMetrics: TaskMetrics, base: Int) = {
147+
def makeTaskMetrics(base: Int) = {
148+
val taskMetrics = new TaskMetrics()
148149
val shuffleReadMetrics = new ShuffleReadMetrics()
149150
val shuffleWriteMetrics = new ShuffleWriteMetrics()
150151
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
@@ -173,16 +174,10 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
173174
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L)))
174175
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
175176

176-
val metrics4 = new TaskMetrics
177-
val metrics5 = new TaskMetrics
178-
val metrics6 = new TaskMetrics
179-
val metrics7 = new TaskMetrics
180177
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
181-
(1234L, 0, 0, updateTaskMetrics(metrics4, 0)))))
182-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
183-
(1235L, 0, 0, updateTaskMetrics(metrics5, 100)))))
184-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
185-
(1236L, 1, 0, updateTaskMetrics(metrics6, 200)))))
178+
(1234L, 0, 0, makeTaskMetrics(0)),
179+
(1235L, 0, 0, makeTaskMetrics(100)),
180+
(1236L, 1, 0, makeTaskMetrics(200)))))
186181

187182
var stage0Data = listener.stageIdToData.get((0, 0)).get
188183
var stage1Data = listener.stageIdToData.get((1, 0)).get
@@ -207,10 +202,10 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
207202

208203
// task that was included in a heartbeat
209204
listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1),
210-
updateTaskMetrics(metrics4, 300)))
205+
makeTaskMetrics(300)))
211206
// task that wasn't included in a heartbeat
212207
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1),
213-
updateTaskMetrics(metrics7, 400)))
208+
makeTaskMetrics(400)))
214209

215210
stage0Data = listener.stageIdToData.get((0, 0)).get
216211
stage1Data = listener.stageIdToData.get((1, 0)).get

0 commit comments

Comments
 (0)