Skip to content

Commit 7389446

Browse files
patrickbrownsyncMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-25837][CORE] Fix potential slowdown in AppStatusListener when cleaning up stages
## What changes were proposed in this pull request? * Update `AppStatusListener` `cleanupStages` method to remove tasks for those stages in a single pass instead of 1 for each stage. * This fixes an issue where the cleanupStages method would get backed up, causing a backup in the executor in ElementTrackingStore, resulting in stages and jobs not getting cleaned up properly. Tasks seem most susceptible to this as there are a lot of them, however a similar issue could arise in other locations the `KVStore` `view` method is used. A broader fix might involve updates to `KVStoreView` and `InMemoryView` as it appears this interface and implementation can lead to multiple and inefficient traversals of the stored data. ## How was this patch tested? Using existing tests in AppStatusListenerSuite This is my original work and I license the work to the project under the project’s open source license. Closes #22883 from patrickbrownsync/cleanup-stages-fix. Authored-by: Patrick Brown <patrick.brown@blyncsy.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit e9d3ca0) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 3d2fce5 commit 7389446

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,16 +1002,6 @@ private[spark] class AppStatusListener(
10021002
kvstore.delete(e.getClass(), e.id)
10031003
}
10041004

1005-
val tasks = kvstore.view(classOf[TaskDataWrapper])
1006-
.index("stage")
1007-
.first(key)
1008-
.last(key)
1009-
.asScala
1010-
1011-
tasks.foreach { t =>
1012-
kvstore.delete(t.getClass(), t.taskId)
1013-
}
1014-
10151005
// Check whether there are remaining attempts for the same stage. If there aren't, then
10161006
// also delete the RDD graph data.
10171007
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
@@ -1034,6 +1024,15 @@ private[spark] class AppStatusListener(
10341024

10351025
cleanupCachedQuantiles(key)
10361026
}
1027+
1028+
// Delete tasks for all stages in one pass, as deleting them for each stage individually is slow
1029+
val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala
1030+
val keys = stages.map { s => (s.info.stageId, s.info.attemptId) }.toSet
1031+
tasks.foreach { t =>
1032+
if (keys.contains((t.stageId, t.stageAttemptId))) {
1033+
kvstore.delete(t.getClass(), t.taskId)
1034+
}
1035+
}
10371036
}
10381037

10391038
private def cleanupTasks(stage: LiveStage): Unit = {

0 commit comments

Comments
 (0)