-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-3797] Run external shuffle service in Yarn NM #3082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b54a0c4
43dcb96
b4b1f0c
1bf5109
ea764e0
cd076a4
804e7ff
5b419b8
5bf9b7e
baff916
15a5b37
761f58a
f39daa6
f48b20c
9b6e058
5f8a96f
d1124e4
7b71d8f
6489db5
0eb6233
1c66046
0ee67a2
ef3ddae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging | |
// Lower and upper bounds on the number of executors. These are required. | ||
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1) | ||
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1) | ||
verifyBounds() | ||
|
||
// How long there must be backlogged tasks for before an addition is triggered | ||
private val schedulerBacklogTimeout = conf.getLong( | ||
|
@@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging | |
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout) | ||
|
||
// How long an executor must be idle for before it is removed | ||
private val removeThresholdSeconds = conf.getLong( | ||
private val executorIdleTimeout = conf.getLong( | ||
"spark.dynamicAllocation.executorIdleTimeout", 600) | ||
|
||
// During testing, the methods to actually kill and add executors are mocked out | ||
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) | ||
|
||
validateSettings() | ||
|
||
// Number of executors to add in the next round | ||
private var numExecutorsToAdd = 1 | ||
|
||
|
@@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging | |
// Polling loop interval (ms) | ||
private val intervalMillis: Long = 100 | ||
|
||
// Whether we are testing this class. This should only be used internally. | ||
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) | ||
|
||
// Clock used to schedule when executors should be added and removed | ||
private var clock: Clock = new RealClock | ||
|
||
/** | ||
* Verify that the lower and upper bounds on the number of executors are valid. | ||
* Verify that the settings specified through the config are valid. | ||
* If not, throw an appropriate exception. | ||
*/ | ||
private def verifyBounds(): Unit = { | ||
private def validateSettings(): Unit = { | ||
if (minNumExecutors < 0 || maxNumExecutors < 0) { | ||
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!") | ||
} | ||
|
@@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging | |
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " + | ||
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!") | ||
} | ||
if (schedulerBacklogTimeout <= 0) { | ||
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!") | ||
} | ||
if (sustainedSchedulerBacklogTimeout <= 0) { | ||
throw new SparkException( | ||
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!") | ||
} | ||
if (executorIdleTimeout <= 0) { | ||
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!") | ||
} | ||
// Require external shuffle service for dynamic allocation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dynamic allocation, at the very least in the sense of dynamically adding executors, can still be useful without the external shuffle service. I hadn't noticed before that dynamic allocation will kill containers with blocks on them. Even with the external shuffle service, isn't this a problem for cached blocks? Would you be opposed to me adding a mode that avoids this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally don't see much use of dynamic allocation if you can't both add and remove executors. By default we start the cluster at the max number of executors anyway, so if we don't remove executors there's little point in adding them. Now, if we do support both, then we have to enable external shuffle service because it's basically totally broken if we kill executors without it. Shuffle is different from caching in that it's not only an optimization to keep the shuffle files but also a necessity for correctness. Since shuffling is such a common thing in Spark, I think we should fail the application early before the user realizes his/her job is being re-run over and over again. But yes, I think we left out the caching story so far. My original design is to add a warning or maybe throw an exception if this is enabled and the user tries to cache stuff, and I believe we still need to add that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even without the external shuffle service, there are opportunities to remove executors, no? Does the shuffle service keep data after jobs have completed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Either way, doesn't need to hold up this JIRA. Given the current implementation, disabling dynamic allocation when there's no external shuffle service seems like the right thing to do.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, that's true. We can still call |
||
// Otherwise, we may lose shuffle files when killing executors | ||
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) { | ||
throw new SparkException("Dynamic allocation of executors requires the external " + | ||
"shuffle service. You may enable this through spark.shuffle.service.enabled.") | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging | |
val removeRequestAcknowledged = testing || sc.killExecutor(executorId) | ||
if (removeRequestAcknowledged) { | ||
logInfo(s"Removing executor $executorId because it has been idle for " + | ||
s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})") | ||
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})") | ||
executorsPendingToRemove.add(executorId) | ||
true | ||
} else { | ||
|
@@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging | |
private def onExecutorIdle(executorId: String): Unit = synchronized { | ||
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { | ||
logDebug(s"Starting idle timer for $executorId because there are no more tasks " + | ||
s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)") | ||
removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000 | ||
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)") | ||
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000 | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -181,6 +181,9 @@ echo "Spark $VERSION$GITREVSTRING built for Hadoop $SPARK_HADOOP_VERSION" > "$DI | |
# Copy jars | ||
cp "$FWDIR"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/" | ||
cp "$FWDIR"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/" | ||
cp "$FWDIR"/network/yarn/target/scala*/spark-network-yarn*.jar "$DISTDIR/lib/" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it would be nicer to use maven-shade-plugin to create a single jar for the NM aux service. That might make it easier for people to install it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I plan to do that though in a separate PR. See my comment in andrewor14@f39daa6 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @andrewor14 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I will fix this in a separate PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @witgo I just pushed a hot fix. I didn't realize |
||
cp "$FWDIR"/network/yarn/target/scala*/spark-network-shuffle*.jar "$DISTDIR/lib/" | ||
cp "$FWDIR"/network/yarn/target/scala*/spark-network-common*.jar "$DISTDIR/lib/" | ||
|
||
# Copy example sources (needed for python and SQL) | ||
mkdir -p "$DISTDIR/examples/src/main" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.network.sasl; | ||
|
||
import java.lang.Override; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.Charset; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import org.apache.spark.network.sasl.SecretKeyHolder; | ||
|
||
/** | ||
* A class that manages shuffle secret used by the external shuffle service. | ||
*/ | ||
public class ShuffleSecretManager implements SecretKeyHolder { | ||
private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class); | ||
private final ConcurrentHashMap<String, String> shuffleSecretMap; | ||
|
||
private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); | ||
|
||
// Spark user used for authenticating SASL connections | ||
// Note that this must match the value in org.apache.spark.SecurityManager | ||
private static final String SPARK_SASL_USER = "sparkSaslUser"; | ||
|
||
/** | ||
* Convert the given string to a byte buffer. The resulting buffer can be converted back to | ||
* the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external | ||
* shuffle service represents shuffle secrets as bytes buffers instead of strings. | ||
*/ | ||
public static ByteBuffer stringToBytes(String s) { | ||
return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET)); | ||
} | ||
|
||
/** | ||
* Convert the given byte buffer to a string. The resulting string can be converted back to | ||
* the same byte buffer through {@link #stringToBytes(String)}. This is used if the external | ||
* shuffle service represents shuffle secrets as bytes buffers instead of strings. | ||
*/ | ||
public static String bytesToString(ByteBuffer b) { | ||
return new String(b.array(), UTF8_CHARSET); | ||
} | ||
|
||
public ShuffleSecretManager() { | ||
shuffleSecretMap = new ConcurrentHashMap<String, String>(); | ||
} | ||
|
||
/** | ||
* Register an application with its secret. | ||
* Executors need to first authenticate themselves with the same secret before | ||
* fetching shuffle files written by other executors in this application. | ||
*/ | ||
public void registerApp(String appId, String shuffleSecret) { | ||
if (!shuffleSecretMap.contains(appId)) { | ||
shuffleSecretMap.put(appId, shuffleSecret); | ||
logger.info("Registered shuffle secret for application {}", appId); | ||
} else { | ||
logger.debug("Application {} already registered", appId); | ||
} | ||
} | ||
|
||
/** | ||
* Register an application with its secret specified as a byte buffer. | ||
*/ | ||
public void registerApp(String appId, ByteBuffer shuffleSecret) { | ||
registerApp(appId, bytesToString(shuffleSecret)); | ||
} | ||
|
||
/** | ||
* Unregister an application along with its secret. | ||
* This is called when the application terminates. | ||
*/ | ||
public void unregisterApp(String appId) { | ||
if (shuffleSecretMap.contains(appId)) { | ||
shuffleSecretMap.remove(appId); | ||
logger.info("Unregistered shuffle secret for application {}", appId); | ||
} else { | ||
logger.warn("Attempted to unregister application {} when it is not registered", appId); | ||
} | ||
} | ||
|
||
/** | ||
* Return the Spark user for authenticating SASL connections. | ||
*/ | ||
@Override | ||
public String getSaslUser(String appId) { | ||
return SPARK_SASL_USER; | ||
} | ||
|
||
/** | ||
* Return the secret key registered with the given application. | ||
* This key is used to authenticate the executors before they can fetch shuffle files | ||
* written by this application from the external shuffle service. If the specified | ||
* application is not registered, return null. | ||
*/ | ||
@Override | ||
public String getSecretKey(String appId) { | ||
return shuffleSecretMap.get(appId); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
~ contributor license agreements. See the NOTICE file distributed with | ||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
|
||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent</artifactId> | ||
<version>1.2.0-SNAPSHOT</version> | ||
<relativePath>../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-network-yarn_2.10</artifactId> | ||
<packaging>jar</packaging> | ||
<name>Spark Project Yarn Shuffle Service Code</name> | ||
<url>http://spark.apache.org/</url> | ||
<properties> | ||
<sbt.project.name>network-yarn</sbt.project.name> | ||
</properties> | ||
|
||
<dependencies> | ||
<!-- Core dependencies --> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-network-shuffle_2.10</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<!-- Provided dependencies --> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-client</artifactId> | ||
<scope>provided</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
</build> | ||
</project> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"spark.test.dynamicAllocation" might be a name more in line with other test-only properties. Also please add some documentation or name that indicates what it means to be testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was actually introduced in a different patch and I'm just moving it around. Also, by testing I mean when I'm mocking its behavior in the relevant test suite. I'm not sure what else I can add to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. That's sufficient, I think -- "During testing, the methods to actually kill and add executors are mocked out."