Skip to content

Commit f16bc68

Browse files
zsxwingrxin
authored andcommitted
[SPARK-9824] [CORE] Fix the issue that InternalAccumulator leaks WeakReference
`InternalAccumulator.create` doesn't call `registerAccumulatorForCleanup` to register itself with ContextCleaner, so `WeakReference`s for these accumulators in `Accumulators.originals` won't be removed. This PR added `registerAccumulatorForCleanup` for internal accumulators to avoid the memory leak. Author: zsxwing <zsxwing@gmail.com> Closes #8108 from zsxwing/internal-accumulators-leak.
1 parent 00c0272 commit f16bc68

File tree

3 files changed

+16
-11
lines changed

3 files changed

+16
-11
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -382,14 +382,18 @@ private[spark] object InternalAccumulator {
382382
* add to the same set of accumulators. We do this to report the distribution of accumulator
383383
* values across all tasks within each stage.
384384
*/
385-
def create(): Seq[Accumulator[Long]] = {
386-
Seq(
387-
// Execution memory refers to the memory used by internal data structures created
388-
// during shuffles, aggregations and joins. The value of this accumulator should be
389-
// approximately the sum of the peak sizes across all such data structures created
390-
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
391-
new Accumulator(
392-
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
393-
) ++ maybeTestAccumulator.toSeq
385+
def create(sc: SparkContext): Seq[Accumulator[Long]] = {
386+
val internalAccumulators = Seq(
387+
// Execution memory refers to the memory used by internal data structures created
388+
// during shuffles, aggregations and joins. The value of this accumulator should be
389+
// approximately the sum of the peak sizes across all such data structures created
390+
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
391+
new Accumulator(
392+
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
393+
) ++ maybeTestAccumulator.toSeq
394+
internalAccumulators.foreach { accumulator =>
395+
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
396+
}
397+
internalAccumulators
394398
}
395399
}

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private[spark] abstract class Stage(
8181
* accumulators here again will override partial values from the finished tasks.
8282
*/
8383
def resetInternalAccumulators(): Unit = {
84-
_internalAccumulators = InternalAccumulator.create()
84+
_internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
8585
}
8686

8787
/**

core/src/test/scala/org/apache/spark/AccumulatorSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
160160
}
161161

162162
test("internal accumulators in TaskContext") {
163-
val accums = InternalAccumulator.create()
163+
sc = new SparkContext("local", "test")
164+
val accums = InternalAccumulator.create(sc)
164165
val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
165166
val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
166167
val collectedInternalAccums = taskContext.collectInternalAccumulators()

0 commit comments

Comments
 (0)