Skip to content

Commit 16b7376

Browse files
committed
Merge branch 'master' into SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up
2 parents 2467732 + 0678afe commit 16b7376

File tree

210 files changed

+6158
-3514
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

210 files changed

+6158
-3514
lines changed

.github/workflows/master.yml

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,38 +117,42 @@ jobs:
117117
java-version: ${{ matrix.java }}
118118
# PySpark
119119
- name: Install PyPy3
120-
# SQL component also has Python related tests, for example, IntegratedUDFTestUtils.
121120
# Note that order of Python installations here matters because default python3 is
122121
# overridden by pypy3.
123122
uses: actions/setup-python@v2
124-
if: contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
123+
if: contains(matrix.modules, 'pyspark')
125124
with:
126125
python-version: pypy3
127126
architecture: x64
128-
- name: Install Python 2.7
127+
- name: Install Python 3.6
129128
uses: actions/setup-python@v2
130-
if: contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
129+
if: contains(matrix.modules, 'pyspark')
131130
with:
132-
python-version: 2.7
131+
python-version: 3.6
133132
architecture: x64
134-
- name: Install Python 3.6
133+
- name: Install Python 3.8
135134
uses: actions/setup-python@v2
136-
# Yarn has a Python specific test too, for example, YarnClusterSuite.
135+
# We should install one Python that is higher then 3+ for SQL and Yarn because:
136+
# - SQL component also has Python related tests, for example, IntegratedUDFTestUtils.
137+
# - Yarn has a Python specific test too, for example, YarnClusterSuite.
137138
if: contains(matrix.modules, 'yarn') || contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
138139
with:
139-
python-version: 3.6
140+
python-version: 3.8
140141
architecture: x64
141-
- name: Install Python packages
142-
if: contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
142+
- name: Install Python packages (Python 3.6 and PyPy3)
143+
if: contains(matrix.modules, 'pyspark')
143144
# PyArrow is not supported in PyPy yet, see ARROW-2651.
144145
# TODO(SPARK-32247): scipy installation with PyPy fails for an unknown reason.
145146
run: |
146-
python3 -m pip install numpy pyarrow pandas scipy
147-
python3 -m pip list
148-
python2 -m pip install numpy pyarrow pandas scipy
149-
python2 -m pip list
147+
python3.6 -m pip install numpy pyarrow pandas scipy
148+
python3.6 -m pip list
150149
pypy3 -m pip install numpy pandas
151150
pypy3 -m pip list
151+
- name: Install Python packages (Python 3.8)
152+
if: contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
153+
run: |
154+
python3.8 -m pip install numpy pyarrow pandas scipy
155+
python3.8 -m pip list
152156
# SparkR
153157
- name: Install R 3.6
154158
uses: r-lib/actions/setup-r@v1

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ BSD 3-Clause
229229
------------
230230

231231
python/lib/py4j-*-src.zip
232-
python/pyspark/cloudpickle.py
232+
python/pyspark/cloudpickle/*.py
233233
python/pyspark/join.py
234234
core/src/main/resources/org/apache/spark/ui/static/d3.min.js
235235

R/pkg/tests/fulltests/test_context.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ test_that("utility function can be called", {
139139
expect_true(TRUE)
140140
})
141141

142-
test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
142+
test_that("getClientModeSparkSubmitOpts() returns spark-submit args from allowList", {
143143
e <- new.env()
144144
e[["spark.driver.memory"]] <- "512m"
145145
ops <- getClientModeSparkSubmitOpts("sparkrmain", e)

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3921,14 +3921,14 @@ test_that("No extra files are created in SPARK_HOME by starting session and maki
39213921
# before creating a SparkSession with enableHiveSupport = T at the top of this test file
39223922
# (filesBefore). The test here is to compare that (filesBefore) against the list of files before
39233923
# any test is run in run-all.R (sparkRFilesBefore).
3924-
# sparkRWhitelistSQLDirs is also defined in run-all.R, and should contain only 2 whitelisted dirs,
3924+
# sparkRAllowedSQLDirs is also defined in run-all.R, and should contain only 2 allowed dirs,
39253925
# here allow the first value, spark-warehouse, in the diff, everything else should be exactly the
39263926
# same as before any test is run.
3927-
compare_list(sparkRFilesBefore, setdiff(filesBefore, sparkRWhitelistSQLDirs[[1]]))
3927+
compare_list(sparkRFilesBefore, setdiff(filesBefore, sparkRAllowedSQLDirs[[1]]))
39283928
# third, ensure only spark-warehouse and metastore_db are created when enableHiveSupport = T
39293929
# note: as the note above, after running all tests in this file while enableHiveSupport = T, we
3930-
# check the list of files again. This time we allow both whitelisted dirs to be in the diff.
3931-
compare_list(sparkRFilesBefore, setdiff(filesAfter, sparkRWhitelistSQLDirs))
3930+
# check the list of files again. This time we allow both dirs to be in the diff.
3931+
compare_list(sparkRFilesBefore, setdiff(filesAfter, sparkRAllowedSQLDirs))
39323932
})
39333933

39343934
unlink(parquetPath)

R/pkg/tests/run-all.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
3535
install.spark(overwrite = TRUE)
3636

3737
sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
38-
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
39-
invisible(lapply(sparkRWhitelistSQLDirs,
38+
sparkRAllowedSQLDirs <- c("spark-warehouse", "metastore_db")
39+
invisible(lapply(sparkRAllowedSQLDirs,
4040
function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)}))
4141
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)
4242

common/network-common/src/main/java/org/apache/spark/network/crypto/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,4 +155,4 @@ server will be able to understand. This will cause the server to close the conne
155155
attacker tries to send any command to the server. The attacker can just hold the channel open for
156156
some time, which will be closed when the server times out the channel. These issues could be
157157
separately mitigated by adding a shorter timeout for the first message after authentication, and
158-
potentially by adding host blacklists if a possible attack is detected from a particular host.
158+
potentially by adding host reject-lists if a possible attack is detected from a particular host.
Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,18 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.streaming.continuous.shuffle
18+
package org.apache.spark.status.api.v1;
1919

20-
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
20+
import org.apache.spark.util.EnumUtil;
2121

22-
/**
23-
* Trait for writing to a continuous processing shuffle.
24-
*/
25-
trait ContinuousShuffleWriter {
26-
def write(epoch: Iterator[UnsafeRow]): Unit
22+
public enum TaskStatus {
23+
RUNNING,
24+
KILLED,
25+
FAILED,
26+
SUCCESS,
27+
UNKNOWN;
28+
29+
public static TaskStatus fromString(String str) {
30+
return EnumUtil.parseIgnoreCase(TaskStatus.class, str);
31+
}
2732
}

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ private[deploy] object DeployMessages {
108108

109109
case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
110110

111+
/**
112+
* Used by the MasterWebUI to request the master to decommission all workers that are active on
113+
* any of the given hostnames.
114+
* @param hostnames: A list of hostnames without the ports. Like "localhost", "foo.bar.com" etc
115+
*/
116+
case class DecommissionWorkersOnHosts(hostnames: Seq[String])
117+
111118
// Master to Worker
112119

113120
sealed trait RegisterWorkerResponse

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -188,23 +188,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
188188
processing.remove(path.getName)
189189
}
190190

191-
private val blacklist = new ConcurrentHashMap[String, Long]
191+
private val inaccessibleList = new ConcurrentHashMap[String, Long]
192192

193193
// Visible for testing
194-
private[history] def isBlacklisted(path: Path): Boolean = {
195-
blacklist.containsKey(path.getName)
194+
private[history] def isAccessible(path: Path): Boolean = {
195+
!inaccessibleList.containsKey(path.getName)
196196
}
197197

198-
private def blacklist(path: Path): Unit = {
199-
blacklist.put(path.getName, clock.getTimeMillis())
198+
private def markInaccessible(path: Path): Unit = {
199+
inaccessibleList.put(path.getName, clock.getTimeMillis())
200200
}
201201

202202
/**
203-
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
203+
* Removes expired entries in the inaccessibleList, according to the provided
204+
* `expireTimeInSeconds`.
204205
*/
205-
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
206+
private def clearInaccessibleList(expireTimeInSeconds: Long): Unit = {
206207
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
207-
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
208+
inaccessibleList.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
208209
}
209210

210211
private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()
@@ -470,7 +471,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
470471
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
471472

472473
val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
473-
.filter { entry => !isBlacklisted(entry.getPath) }
474+
.filter { entry => isAccessible(entry.getPath) }
474475
.filter { entry => !isProcessing(entry.getPath) }
475476
.flatMap { entry => EventLogFileReader(fs, entry) }
476477
.filter { reader =>
@@ -687,8 +688,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
687688
case e: AccessControlException =>
688689
// We don't have read permissions on the log file
689690
logWarning(s"Unable to read log $rootPath", e)
690-
blacklist(rootPath)
691-
// SPARK-28157 We should remove this blacklisted entry from the KVStore
691+
markInaccessible(rootPath)
692+
// SPARK-28157 We should remove this inaccessible entry from the KVStore
692693
// to handle permission-only changes with the same file sizes later.
693694
listing.delete(classOf[LogInfo], rootPath.toString)
694695
case e: Exception =>
@@ -956,8 +957,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
956957
}
957958
}
958959

959-
// Clean the blacklist from the expired entries.
960-
clearBlacklist(CLEAN_INTERVAL_S)
960+
// Clean the inaccessibleList from the expired entries.
961+
clearInaccessibleList(CLEAN_INTERVAL_S)
961962
}
962963

963964
private def deleteAttemptLogs(
@@ -1334,7 +1335,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
13341335

13351336
private def deleteLog(fs: FileSystem, log: Path): Boolean = {
13361337
var deleted = false
1337-
if (isBlacklisted(log)) {
1338+
if (!isAccessible(log)) {
13381339
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
13391340
} else {
13401341
try {

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import java.util.{Date, Locale}
2222
import java.util.concurrent.{ScheduledFuture, TimeUnit}
2323

2424
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
25+
import scala.collection.mutable
2526
import scala.util.Random
27+
import scala.util.control.NonFatal
2628

2729
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
2830
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, SparkHadoopUtil}
@@ -525,6 +527,13 @@ private[deploy] class Master(
525527
case KillExecutors(appId, executorIds) =>
526528
val formattedExecutorIds = formatExecutorIds(executorIds)
527529
context.reply(handleKillExecutors(appId, formattedExecutorIds))
530+
531+
case DecommissionWorkersOnHosts(hostnames) =>
532+
if (state != RecoveryState.STANDBY) {
533+
context.reply(decommissionWorkersOnHosts(hostnames))
534+
} else {
535+
context.reply(0)
536+
}
528537
}
529538

530539
override def onDisconnected(address: RpcAddress): Unit = {
@@ -863,6 +872,34 @@ private[deploy] class Master(
863872
true
864873
}
865874

875+
/**
876+
* Decommission all workers that are active on any of the given hostnames. The decommissioning is
877+
* asynchronously done by enqueueing WorkerDecommission messages to self. No checks are done about
878+
* the prior state of the worker. So an already decommissioned worker will match as well.
879+
*
880+
* @param hostnames: A list of hostnames without the ports. Like "localhost", "foo.bar.com" etc
881+
*
882+
* Returns the number of workers that matched the hostnames.
883+
*/
884+
private def decommissionWorkersOnHosts(hostnames: Seq[String]): Integer = {
885+
val hostnamesSet = hostnames.map(_.toLowerCase(Locale.ROOT)).toSet
886+
val workersToRemove = addressToWorker
887+
.filterKeys(addr => hostnamesSet.contains(addr.host.toLowerCase(Locale.ROOT)))
888+
.values
889+
890+
val workersToRemoveHostPorts = workersToRemove.map(_.hostPort)
891+
logInfo(s"Decommissioning the workers with host:ports ${workersToRemoveHostPorts}")
892+
893+
// The workers are removed async to avoid blocking the receive loop for the entire batch
894+
workersToRemove.foreach(wi => {
895+
logInfo(s"Sending the worker decommission to ${wi.id} and ${wi.endpoint}")
896+
self.send(WorkerDecommission(wi.id, wi.endpoint))
897+
})
898+
899+
// Return the count of workers actually removed
900+
workersToRemove.size
901+
}
902+
866903
private def decommissionWorker(worker: WorkerInfo): Unit = {
867904
if (worker.state != WorkerState.DECOMMISSIONED) {
868905
logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port))

0 commit comments

Comments
 (0)