Skip to content

Commit b5bb9a2

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-32444
# Conflicts: # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q33.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q33/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q5.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q5/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q56.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q56/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q60.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q60/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q70.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q70/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q83.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q83/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q93.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q93/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q35.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q35/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q35a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q35a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q70a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q70a/explain.txt # sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
2 parents c436bc4 + 5775073 commit b5bb9a2

File tree

461 files changed

+6918
-12828
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

461 files changed

+6918
-12828
lines changed

.github/workflows/build_and_test.yml

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ on:
77
pull_request:
88
branches:
99
- master
10+
workflow_dispatch:
11+
inputs:
12+
target:
13+
description: 'Target branch to run'
14+
required: true
1015

1116
jobs:
1217
# Build: build Spark and run the tests for specified modules.
@@ -82,18 +87,26 @@ jobs:
8287
# GitHub Actions' default miniconda to use in pip packaging test.
8388
CONDA_PREFIX: /usr/share/miniconda
8489
GITHUB_PREV_SHA: ${{ github.event.before }}
90+
GITHUB_INPUT_BRANCH: ${{ github.event.inputs.target }}
8591
steps:
8692
- name: Checkout Spark repository
8793
uses: actions/checkout@v2
8894
# In order to fetch changed files
8995
with:
9096
fetch-depth: 0
97+
- name: Merge dispatched input branch
98+
if: ${{ github.event.inputs.target != '' }}
99+
run: git merge --progress --ff-only origin/${{ github.event.inputs.target }}
91100
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
92101
- name: Cache Scala, SBT, Maven and Zinc
93-
uses: actions/cache@v1
102+
uses: actions/cache@v2
94103
with:
95-
path: build
96-
key: build-${{ hashFiles('**/pom.xml') }}
104+
path: |
105+
build/apache-maven-*
106+
build/zinc-*
107+
build/scala-*
108+
build/*.jar
109+
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
97110
restore-keys: |
98111
build-
99112
- name: Cache Maven local repository
@@ -107,7 +120,7 @@ jobs:
107120
uses: actions/cache@v2
108121
with:
109122
path: ~/.ivy2/cache
110-
key: ${{ matrix.java }}-${{ matrix.hadoop }}-ivy-${{ hashFiles('**/pom.xml') }}-${{ hashFiles('**/plugins.sbt') }}
123+
key: ${{ matrix.java }}-${{ matrix.hadoop }}-ivy-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
111124
restore-keys: |
112125
${{ matrix.java }}-${{ matrix.hadoop }}-ivy-
113126
- name: Install JDK ${{ matrix.java }}
@@ -217,7 +230,7 @@ jobs:
217230
run: |
218231
# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
219232
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
220-
pip3 install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme
233+
pip3 install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme ipython nbsphinx
221234
- name: Install R 4.0
222235
run: |
223236
sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list"
@@ -236,10 +249,11 @@ jobs:
236249
ruby-version: 2.7
237250
- name: Install dependencies for documentation generation
238251
run: |
252+
# pandoc is required to generate PySpark APIs as well in nbsphinx.
239253
sudo apt-get install -y libcurl4-openssl-dev pandoc
240254
# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
241255
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
242-
pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme
256+
pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme ipython nbsphinx
243257
gem install jekyll jekyll-redirect-from rouge
244258
sudo Rscript -e "install.packages(c('devtools', 'testthat', 'knitr', 'rmarkdown', 'roxygen2'), repos='https://cloud.r-project.org/')"
245259
- name: Scala linter

binder/apt.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
openjdk-8-jre

binder/postBuild

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/bin/bash
2+
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one or more
5+
# contributor license agreements. See the NOTICE file distributed with
6+
# this work for additional information regarding copyright ownership.
7+
# The ASF licenses this file to You under the Apache License, Version 2.0
8+
# (the "License"); you may not use this file except in compliance with
9+
# the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
20+
# This file is used for Binder integration to install PySpark available in
21+
# Jupyter notebook.
22+
23+
VERSION=$(python -c "exec(open('python/pyspark/version.py').read()); print(__version__)")
24+
pip install "pyspark[sql,ml,mllib]<=$VERSION"

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public boolean sharedByteBufAllocators() {
290290
}
291291

292292
/**
293-
* If enabled then off-heap byte buffers will be prefered for the shared ByteBuf allocators.
293+
* If enabled then off-heap byte buffers will be preferred for the shared ByteBuf allocators.
294294
*/
295295
public boolean preferDirectBufsForSharedByteBufAllocators() {
296296
return conf.getBoolean("spark.network.io.preferDirectBufs", true);

core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public interface DriverPlugin {
4141
* initialization.
4242
* <p>
4343
* It's recommended that plugins be careful about what operations are performed in this call,
44-
* preferrably performing expensive operations in a separate thread, or postponing them until
44+
* preferably performing expensive operations in a separate thread, or postponing them until
4545
* the application has fully started.
4646
*
4747
* @param sc The SparkContext loading the plugin.

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private[spark] trait ExecutorAllocationClient {
8888
* Default implementation delegates to kill, scheduler must override
8989
* if it supports graceful decommissioning.
9090
*
91-
* @param executorsAndDecominfo identifiers of executors & decom info.
91+
* @param executorsAndDecomInfo identifiers of executors & decom info.
9292
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
9393
* after these executors have been decommissioned.
9494
* @return the ids of the executors acknowledged by the cluster manager to be removed.

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,9 @@ private[spark] class ExecutorAllocationManager(
279279
numExecutorsTargetPerResourceProfileId.keys.foreach { rpId =>
280280
numExecutorsTargetPerResourceProfileId(rpId) = initialNumExecutors
281281
}
282+
numExecutorsToAddPerResourceProfileId.keys.foreach { rpId =>
283+
numExecutorsToAddPerResourceProfileId(rpId) = 1
284+
}
282285
executorMonitor.reset()
283286
}
284287

@@ -595,7 +598,7 @@ private[spark] class ExecutorAllocationManager(
595598
// reset the newExecutorTotal to the existing number of executors
596599
if (testing || executorsRemoved.nonEmpty) {
597600
if (decommissionEnabled) {
598-
executorMonitor.executorsDecommissioned(executorsRemoved)
601+
executorMonitor.executorsDecommissioned(executorsRemoved.toSeq)
599602
} else {
600603
executorMonitor.executorsKilled(executorsRemoved.toSeq)
601604
}

core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ private class HistoryServerMemoryManager(
3333
conf: SparkConf) extends Logging {
3434

3535
private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
36-
private val currentUsage = new AtomicLong(0L)
37-
private val active = new HashMap[(String, Option[String]), Long]()
36+
// Visible for testing.
37+
private[history] val currentUsage = new AtomicLong(0L)
38+
private[history] val active = new HashMap[(String, Option[String]), Long]()
3839

3940
def initialize(): Unit = {
4041
logInfo("Initialized memory manager: " +

core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ private[history] class HybridStore extends KVStore {
5454
private var backgroundThread: Thread = null
5555

5656
// A hash map that stores all classes that had been writen to inMemoryStore
57-
private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
57+
// Visible for testing
58+
private[history] val klassMap = new ConcurrentHashMap[Class[_], Boolean]
5859

5960
override def getMetadata[T](klass: Class[T]): T = {
6061
getStore().getMetadata(klass)
@@ -165,8 +166,9 @@ private[history] class HybridStore extends KVStore {
165166

166167
/**
167168
* This method return the store that we should use.
169+
* Visible for testing.
168170
*/
169-
private def getStore(): KVStore = {
171+
private[history] def getStore(): KVStore = {
170172
if (shouldUseInMemoryStore.get) {
171173
inMemoryStore
172174
} else {

core/src/main/scala/org/apache/spark/resource/ResourceDiscoveryScriptPlugin.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.util.Utils.executeAndGetOutput
2929
/**
3030
* The default plugin that is loaded into a Spark application to control how custom
3131
* resources are discovered. This executes the discovery script specified by the user
32-
* and gets the json output back and contructs ResourceInformation objects from that.
32+
* and gets the json output back and constructs ResourceInformation objects from that.
3333
* If the user specifies custom plugins, this is the last one to be executed and
3434
* throws if the resource isn't discovered.
3535
*

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1825,7 +1825,7 @@ private[spark] class DAGScheduler(
18251825
if (bmAddress != null) {
18261826
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
18271827
val isHostDecommissioned = taskScheduler
1828-
.getExecutorDecommissionInfo(bmAddress.executorId)
1828+
.getExecutorDecommissionState(bmAddress.executorId)
18291829
.exists(_.isHostDecommissioned)
18301830

18311831
// Shuffle output of all executors on host `bmAddress.host` may be lost if:

core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,23 @@
1818
package org.apache.spark.scheduler
1919

2020
/**
21-
* Provides more detail when an executor is being decommissioned.
21+
* Message providing more detail when an executor is being decommissioned.
2222
* @param message Human readable reason for why the decommissioning is happening.
2323
* @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is
2424
* being decommissioned too. Used to infer if the shuffle data might
2525
* be lost even if the external shuffle service is enabled.
2626
*/
2727
private[spark]
2828
case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean)
29+
30+
/**
31+
* State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived
32+
* from the info message above but it is kept distinct to allow the state to evolve independently
33+
* from the message.
34+
*/
35+
case class ExecutorDecommissionState(
36+
// Timestamp the decommissioning commenced as per the Driver's clock,
37+
// to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL
38+
// is configured.
39+
startTime: Long,
40+
isHostDecommissioned: Boolean)

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private[spark] class ShuffleMapStage(
5252
* Partitions that either haven't yet been computed, or that were computed on an executor
5353
* that has since been lost, so should be re-computed. This variable is used by the
5454
* DAGScheduler to determine when a stage has completed. Task successes in both the active
55-
* attempt for the stage or in earlier attempts for this stage can cause paritition ids to get
55+
* attempt for the stage or in earlier attempts for this stage can cause partition ids to get
5656
* removed from pendingPartitions. As a result, this variable may be inconsistent with the pending
5757
* tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here
5858
* will always be a subset of the partitions that the TaskSetManager thinks are pending).

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private[spark] trait TaskScheduler {
106106
/**
107107
* If an executor is decommissioned, return its corresponding decommission info
108108
*/
109-
def getExecutorDecommissionInfo(executorId: String): Option[ExecutorDecommissionInfo]
109+
def getExecutorDecommissionState(executorId: String): Option[ExecutorDecommissionState]
110110

111111
/**
112112
* Process a lost executor

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private[spark] class TaskSchedulerImpl(
141141

142142
// We add executors here when we first get decommission notification for them. Executors can
143143
// continue to run even after being asked to decommission, but they will eventually exit.
144-
val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo]
144+
val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionState]
145145

146146
// When they exit and we know of that via heartbeat failure, we will add them to this cache.
147147
// This cache is consulted to know if a fetch failure is because a source executor was
@@ -152,7 +152,7 @@ private[spark] class TaskSchedulerImpl(
152152
.ticker(new Ticker{
153153
override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis())
154154
})
155-
.build[String, ExecutorDecommissionInfo]()
155+
.build[String, ExecutorDecommissionState]()
156156
.asMap()
157157

158158
def runningTasksByExecutors: Map[String, Int] = synchronized {
@@ -293,7 +293,7 @@ private[spark] class TaskSchedulerImpl(
293293
private[scheduler] def createTaskSetManager(
294294
taskSet: TaskSet,
295295
maxTaskFailures: Int): TaskSetManager = {
296-
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
296+
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock)
297297
}
298298

299299
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -922,22 +922,36 @@ private[spark] class TaskSchedulerImpl(
922922
synchronized {
923923
// Don't bother noting decommissioning for executors that we don't know about
924924
if (executorIdToHost.contains(executorId)) {
925-
// The scheduler can get multiple decommission updates from multiple sources,
926-
// and some of those can have isHostDecommissioned false. We merge them such that
927-
// if we heard isHostDecommissioned ever true, then we keep that one since it is
928-
// most likely coming from the cluster manager and thus authoritative
929-
val oldDecomInfo = executorsPendingDecommission.get(executorId)
930-
if (!oldDecomInfo.exists(_.isHostDecommissioned)) {
931-
executorsPendingDecommission(executorId) = decommissionInfo
925+
val oldDecomStateOpt = executorsPendingDecommission.get(executorId)
926+
val newDecomState = if (oldDecomStateOpt.isEmpty) {
927+
// This is the first time we are hearing of decommissioning this executor,
928+
// so create a brand new state.
929+
ExecutorDecommissionState(
930+
clock.getTimeMillis(),
931+
decommissionInfo.isHostDecommissioned)
932+
} else {
933+
val oldDecomState = oldDecomStateOpt.get
934+
if (!oldDecomState.isHostDecommissioned && decommissionInfo.isHostDecommissioned) {
935+
// Only the cluster manager is allowed to send decommission messages with
936+
// isHostDecommissioned set. So the new decommissionInfo is from the cluster
937+
// manager and is thus authoritative. Flip isHostDecommissioned to true but keep the old
938+
// decommission start time.
939+
ExecutorDecommissionState(
940+
oldDecomState.startTime,
941+
isHostDecommissioned = true)
942+
} else {
943+
oldDecomState
944+
}
932945
}
946+
executorsPendingDecommission(executorId) = newDecomState
933947
}
934948
}
935949
rootPool.executorDecommission(executorId)
936950
backend.reviveOffers()
937951
}
938952

939-
override def getExecutorDecommissionInfo(executorId: String)
940-
: Option[ExecutorDecommissionInfo] = synchronized {
953+
override def getExecutorDecommissionState(executorId: String)
954+
: Option[ExecutorDecommissionState] = synchronized {
941955
executorsPendingDecommission
942956
.get(executorId)
943957
.orElse(Option(decommissionedExecutorsRemoved.get(executorId)))
@@ -948,14 +962,14 @@ private[spark] class TaskSchedulerImpl(
948962
val reason = givenReason match {
949963
// Handle executor process loss due to decommissioning
950964
case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) =>
951-
val executorDecommissionInfo = getExecutorDecommissionInfo(executorId)
965+
val executorDecommissionState = getExecutorDecommissionState(executorId)
952966
ExecutorProcessLost(
953967
message,
954968
// Also mark the worker lost if we know that the host was decommissioned
955-
origWorkerLost || executorDecommissionInfo.exists(_.isHostDecommissioned),
969+
origWorkerLost || executorDecommissionState.exists(_.isHostDecommissioned),
956970
// Executor loss is certainly not caused by app if we knew that this executor is being
957971
// decommissioned
958-
causedByApp = executorDecommissionInfo.isEmpty && origCausedByApp)
972+
causedByApp = executorDecommissionState.isEmpty && origCausedByApp)
959973
case e => e
960974
}
961975

@@ -1047,8 +1061,8 @@ private[spark] class TaskSchedulerImpl(
10471061
}
10481062

10491063

1050-
val decomInfo = executorsPendingDecommission.remove(executorId)
1051-
decomInfo.foreach(decommissionedExecutorsRemoved.put(executorId, _))
1064+
val decomState = executorsPendingDecommission.remove(executorId)
1065+
decomState.foreach(decommissionedExecutorsRemoved.put(executorId, _))
10521066

10531067
if (reason != LossReasonPending) {
10541068
executorIdToHost -= executorId
@@ -1085,12 +1099,12 @@ private[spark] class TaskSchedulerImpl(
10851099

10861100
// exposed for test
10871101
protected final def isExecutorDecommissioned(execId: String): Boolean =
1088-
getExecutorDecommissionInfo(execId).nonEmpty
1102+
getExecutorDecommissionState(execId).isDefined
10891103

10901104
// exposed for test
10911105
protected final def isHostDecommissioned(host: String): Boolean = {
10921106
hostToExecutors.get(host).exists { executors =>
1093-
executors.exists(e => getExecutorDecommissionInfo(e).exists(_.isHostDecommissioned))
1107+
executors.exists(e => getExecutorDecommissionState(e).exists(_.isHostDecommissioned))
10941108
}
10951109
}
10961110

0 commit comments

Comments
 (0)