Skip to content

Commit e988be7

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7385][CARMEL-6381] Remove unnecessary sql metrics for UnionExec (apache#132)
1 parent 343d9d0 commit e988be7

File tree

6 files changed

+99
-24
lines changed

6 files changed

+99
-24
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,18 @@ package object config {
296296
.stringConf
297297
.createWithDefaultString("file,hdfs")
298298

299+
private[spark] val EXECUTOR_METRICS_SENT_UPDATED_EXCEPT_FIRST_PART_ENABLED =
300+
ConfigBuilder("spark.executor.metrics.send.updated.exceptFirstPart")
301+
.doc("Only sent updated metrics to driver side for all tasks except the first " +
302+
"partition, the first partition will send back all metrics, because some metrics " +
303+
"like sql related metrics is needed from driver side even it is zero, but only " +
304+
"one partition send back the zero metrics is good enough, that will save lots " +
305+
"of driver memory especially for union rdds, which contains lots of unused metrics " +
306+
"for each task.")
307+
.version("3.5.0")
308+
.booleanConf
309+
.createWithDefault(true)
310+
299311
private[spark] val EXECUTOR_JAVA_OPTIONS =
300312
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS)
301313
.withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,7 @@ private[spark] class DAGScheduler(
455455
prevShuffleSize.getAndAdd(currTaskShuffleSize)
456456
}
457457

458-
val event = CompletionEvent(task, reason, result,
459-
lightAccumUpdates, lightTaskMetrics, metricPeaks, taskInfo)
458+
val event = CompletionEvent(task, reason, result, lightTaskMetrics, metricPeaks, taskInfo)
460459
val stageOpt = stageIdToStage.get(task.stageId)
461460
if (stageOpt.isEmpty) {
462461
// The stage may have already finished when we get this event -- eg. maybe it was a
@@ -483,14 +482,15 @@ private[spark] class DAGScheduler(
483482
case Some(job) =>
484483
// Only update the accumulator once for each result task.
485484
if (!job.finished(rt.outputId)) {
486-
updateAccumulators(event)
485+
updateAccumulators(event.task, lightAccumUpdates, event.taskInfo)
487486
}
488487
case None => // Ignore update if task's job has finished.
489488
}
490489
case _ =>
491-
updateAccumulators(event)
490+
updateAccumulators(event.task, lightAccumUpdates, event.taskInfo)
492491
}
493-
case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event)
492+
case _: ExceptionFailure | _: TaskKilled =>
493+
updateAccumulators(event.task, lightAccumUpdates, event.taskInfo)
494494
case _ =>
495495
}
496496

@@ -504,7 +504,7 @@ private[spark] class DAGScheduler(
504504

505505
val taskMetricsForDAG: TaskMetrics = taskMetricsFromAccumulators(accumUpdatesForDAG)
506506
val eventForDAGScheduler = CompletionEvent(task, reason, result,
507-
accumUpdatesForDAG, taskMetricsForDAG, metricPeaks, taskInfo)
507+
taskMetricsForDAG, metricPeaks, taskInfo)
508508
eventProcessLoop.post(eventForDAGScheduler)
509509
}
510510

@@ -1844,14 +1844,7 @@ private[spark] class DAGScheduler(
18441844
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
18451845
// consistent view of both variables.
18461846
RDDCheckpointData.synchronized {
1847-
taskBinaryBytes = stage match {
1848-
case stage: ShuffleMapStage =>
1849-
JavaUtils.bufferToArray(
1850-
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
1851-
case stage: ResultStage =>
1852-
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
1853-
}
1854-
1847+
taskBinaryBytes = serializeTaskBinaries(stage)
18551848
partitions = stage.rdd.partitions
18561849
}
18571850
} catch {
@@ -1912,6 +1905,17 @@ private[spark] class DAGScheduler(
19121905
}
19131906
}
19141907

1908+
private[scheduler] def serializeTaskBinaries(stage: Stage): Array[Byte] = {
1909+
val taskBinaries = stage match {
1910+
case stage: ShuffleMapStage =>
1911+
JavaUtils.bufferToArray(
1912+
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
1913+
case stage: ResultStage =>
1914+
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
1915+
}
1916+
taskBinaries
1917+
}
1918+
19151919
private[scheduler] def handleSubmitMissingTask(missingTask: SubmitMissingTask): Unit = {
19161920
logDebug("submitMissingTasks(" + missingTask.stage + ")")
19171921
if (missingTask.taskBinary == null) {
@@ -2013,11 +2017,13 @@ private[spark] class DAGScheduler(
20132017
* This still doesn't stop the caller from updating the accumulator outside the scheduler,
20142018
* but that's not our problem since there's nothing we can do about that.
20152019
*/
2016-
private def updateAccumulators(event: CompletionEvent): Unit = {
2017-
val task = event.task
2020+
private def updateAccumulators(
2021+
task: Task[_],
2022+
accumUpdates: Seq[AccumulatorV2[_, _]],
2023+
taskInfo: TaskInfo): Unit = {
20182024
val stage = stageIdToStage(task.stageId)
20192025

2020-
event.accumUpdates.foreach { updates =>
2026+
accumUpdates.foreach { updates =>
20212027
val id = updates.id
20222028
try {
20232029
// Find the corresponding accumulator on the driver and update it
@@ -2032,8 +2038,8 @@ private[spark] class DAGScheduler(
20322038
// To avoid UI cruft, ignore cases where value wasn't updated
20332039
if (acc.name.isDefined && !updates.isZero) {
20342040
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
2035-
event.taskInfo.setAccumulables(
2036-
acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
2041+
taskInfo.setAccumulables(
2042+
acc.toInfo(Some(updates.value), Some(acc.value)) +: taskInfo.accumulables)
20372043
}
20382044
} catch {
20392045
case NonFatal(e) =>

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark._
2323
import org.apache.spark.broadcast.Broadcast
2424
import org.apache.spark.executor.TaskMetrics
2525
import org.apache.spark.rdd.RDD
26-
import org.apache.spark.util.{AccumulatorV2, CallSite}
26+
import org.apache.spark.util.CallSite
2727

2828
/**
2929
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
@@ -84,7 +84,6 @@ private[scheduler] case class CompletionEvent(
8484
task: Task[_],
8585
reason: TaskEndReason,
8686
result: Any,
87-
accumUpdates: Seq[AccumulatorV2[_, _]],
8887
taskMetrics: TaskMetrics,
8988
metricPeaks: Array[Long],
9089
taskInfo: TaskInfo)

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.Properties
2222

2323
import org.apache.spark._
2424
import org.apache.spark.executor.TaskMetrics
25-
import org.apache.spark.internal.config.APP_CALLER_CONTEXT
25+
import org.apache.spark.internal.config.{APP_CALLER_CONTEXT, EXECUTOR_METRICS_SENT_UPDATED_EXCEPT_FIRST_PART_ENABLED}
2626
import org.apache.spark.internal.plugin.PluginContainer
2727
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
2828
import org.apache.spark.metrics.MetricsSystem
@@ -210,12 +210,28 @@ private[spark] abstract class Task[T](
210210
context.taskMetrics.nonZeroInternalAccums() ++
211211
// zero value external accumulators may still be useful, e.g. SQLMetrics, we should not
212212
// filter them out.
213-
context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
213+
collectExternalAccumUpdates(context.taskMetrics.externalAccums, taskFailed)
214214
} else {
215215
Seq.empty
216216
}
217217
}
218218

219+
private def collectExternalAccumUpdates(
220+
extAccumUpdates: Seq[AccumulatorV2[_, _]], taskFailed: Boolean): Seq[AccumulatorV2[_, _]] = {
221+
// Use Option to fix NPE in the test of SPARK-32160
222+
val sentOnlyUpdatedMetricsExceptFirstPart = Option(SparkEnv.get)
223+
.exists(_.conf.get(EXECUTOR_METRICS_SENT_UPDATED_EXCEPT_FIRST_PART_ENABLED))
224+
extAccumUpdates.filter { a =>
225+
var filter = !taskFailed || a.countFailedValues
226+
// only send all metrics for the first part
227+
// and send only updated metrics for other partitions
228+
if (sentOnlyUpdatedMetricsExceptFirstPart && partitionId != 0) {
229+
filter = filter && !a.isZero
230+
}
231+
filter
232+
}
233+
}
234+
219235
/**
220236
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
221237
* code and user code to properly handle the flag. This function should be idempotent so it can

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4974,7 +4974,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
49744974
} else {
49754975
null
49764976
}
4977-
CompletionEvent(task, reason, result, allAccumUpdates, taskMetrics, metricPeaks, taskInfo)
4977+
CompletionEvent(task, reason, result, taskMetrics, metricPeaks, taskInfo)
49784978
}
49794979
}
49804980

core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,48 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
669669
assert(invocationOrder === Seq("C", "B", "A", "D"))
670670
}
671671

672+
test("Only first partition updated external accumulators will be sent back to driver") {
673+
sc = new SparkContext("local", "test")
674+
// Create a dummy task. We won't end up running this; we just want to collect
675+
// accumulator updates from it.
676+
val taskMetrics1 = TaskMetrics.registered
677+
val ext1 = new LongAccumulator
678+
ext1.register(sc, Some("extAccum1"))
679+
taskMetrics1.registerAccumulator(ext1)
680+
val task1 = new Task[Int](0, 0, 0, 1, JobArtifactSet.getActiveOrDefault(sc)) {
681+
context = new TaskContextImpl(0, 0, 0, 0L, 0, 1,
682+
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
683+
new Properties,
684+
SparkEnv.get.metricsSystem,
685+
taskMetrics1)
686+
687+
override def runTask(tc: TaskContext): Int = 0
688+
}
689+
val updatedAccums = task1.collectAccumulatorUpdates()
690+
assert(updatedAccums.length == 2)
691+
assert(updatedAccums(0).name == Some(InternalAccumulator.RESULT_SIZE))
692+
assert(updatedAccums(0).value == 0)
693+
assert(updatedAccums(1).name == Some("extAccum1"))
694+
assert(updatedAccums(1).value == 0)
695+
696+
val taskMetrics2 = TaskMetrics.registered
697+
val ext2 = new LongAccumulator
698+
ext2.register(sc, Some("extAccum2"))
699+
taskMetrics2.registerAccumulator(ext2)
700+
val task2 = new Task[Int](0, 0, 1, 1, JobArtifactSet.getActiveOrDefault(sc)) {
701+
context = new TaskContextImpl(0, 0, 1, 0L, 0, 1,
702+
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
703+
new Properties,
704+
SparkEnv.get.metricsSystem,
705+
taskMetrics2)
706+
707+
override def runTask(tc: TaskContext): Int = 0
708+
}
709+
val updatedAccums2 = task2.collectAccumulatorUpdates()
710+
// external accumulators won't be send back for the second partition
711+
// when it is not updated
712+
assert(updatedAccums2.length == 1)
713+
}
672714
}
673715

674716
private object TaskContextSuite {

0 commit comments

Comments
 (0)