File tree Expand file tree Collapse file tree 2 files changed +17
-6
lines changed
core/src/main/scala/org/apache/spark Expand file tree Collapse file tree 2 files changed +17
-6
lines changed Original file line number Diff line number Diff line change @@ -28,9 +28,13 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
2828 @ GET
2929 def executorList (): Seq [ExecutorSummary ] = {
3030 val listener = ui.executorsListener
31- val storageStatusList = listener.storageStatusList
32- (0 until storageStatusList.size).map { statusId =>
33- ExecutorsPage .getExecInfo(listener, statusId)
31+ listener.synchronized {
32+ // The follow codes should be protected by `listener` to make sure no executors will be
33+ // removed before we query their status. See SPARK-12784.
34+ val storageStatusList = listener.storageStatusList
35+ (0 until storageStatusList.size).map { statusId =>
36+ ExecutorsPage .getExecInfo(listener, statusId)
37+ }
3438 }
3539 }
3640}
Original file line number Diff line number Diff line change @@ -52,12 +52,19 @@ private[ui] class ExecutorsPage(
5252 private val listener = parent.listener
5353
5454 def render (request : HttpServletRequest ): Seq [Node ] = {
55- val storageStatusList = listener.storageStatusList
55+ val (storageStatusList, execInfo) = listener.synchronized {
56+ // The follow codes should be protected by `listener` to make sure no executors will be
57+ // removed before we query their status. See SPARK-12784.
58+ val _storageStatusList = listener.storageStatusList
59+ val _execInfo = {
60+ for (statusId <- 0 until _storageStatusList.size)
61+ yield ExecutorsPage .getExecInfo(listener, statusId)
62+ }
63+ (_storageStatusList, _execInfo)
64+ }
5665 val maxMem = storageStatusList.map(_.maxMem).sum
5766 val memUsed = storageStatusList.map(_.memUsed).sum
5867 val diskUsed = storageStatusList.map(_.diskUsed).sum
59- val execInfo = for (statusId <- 0 until storageStatusList.size) yield
60- ExecutorsPage .getExecInfo(listener, statusId)
6168 val execInfoSorted = execInfo.sortBy(_.id)
6269 val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
6370
You can’t perform that action at this time.
0 commit comments