Skip to content

Commit 206c955

Browse files
committed
Merge remote-tracking branch 'origin/master' into thread-local-date-format
2 parents 2397401 + d9e4cf6 commit 206c955

File tree

261 files changed

+3970
-3277
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

261 files changed

+3970
-3277
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private static class ClientPool {
8484

8585
private final Class<? extends Channel> socketChannelClass;
8686
private EventLoopGroup workerGroup;
87-
private PooledByteBufAllocator pooledAllocator;
87+
private final PooledByteBufAllocator pooledAllocator;
8888
private final NettyMemoryMetrics metrics;
8989

9090
public TransportClientFactory(
@@ -103,8 +103,13 @@ public TransportClientFactory(
103103
ioMode,
104104
conf.clientThreads(),
105105
conf.getModuleName() + "-client");
106-
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
107-
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
106+
if (conf.sharedByteBufAllocators()) {
107+
this.pooledAllocator = NettyUtils.getSharedPooledByteBufAllocator(
108+
conf.preferDirectBufsForSharedByteBufAllocators(), false /* allowCache */);
109+
} else {
110+
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
111+
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
112+
}
108113
this.metrics = new NettyMemoryMetrics(
109114
this.pooledAllocator, conf.getModuleName() + "-client", conf);
110115
}

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class TransportServer implements Closeable {
5454
private ServerBootstrap bootstrap;
5555
private ChannelFuture channelFuture;
5656
private int port = -1;
57+
private final PooledByteBufAllocator pooledAllocator;
5758
private NettyMemoryMetrics metrics;
5859

5960
/**
@@ -69,6 +70,13 @@ public TransportServer(
6970
this.context = context;
7071
this.conf = context.getConf();
7172
this.appRpcHandler = appRpcHandler;
73+
if (conf.sharedByteBufAllocators()) {
74+
this.pooledAllocator = NettyUtils.getSharedPooledByteBufAllocator(
75+
conf.preferDirectBufsForSharedByteBufAllocators(), true /* allowCache */);
76+
} else {
77+
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
78+
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
79+
}
7280
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
7381

7482
boolean shouldClose = true;
@@ -96,18 +104,15 @@ private void init(String hostToBind, int portToBind) {
96104
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
97105
EventLoopGroup workerGroup = bossGroup;
98106

99-
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
100-
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
101-
102107
bootstrap = new ServerBootstrap()
103108
.group(bossGroup, workerGroup)
104109
.channel(NettyUtils.getServerChannelClass(ioMode))
105-
.option(ChannelOption.ALLOCATOR, allocator)
110+
.option(ChannelOption.ALLOCATOR, pooledAllocator)
106111
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
107-
.childOption(ChannelOption.ALLOCATOR, allocator);
112+
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
108113

109114
this.metrics = new NettyMemoryMetrics(
110-
allocator, conf.getModuleName() + "-server", conf);
115+
pooledAllocator, conf.getModuleName() + "-server", conf);
111116

112117
if (conf.backLog() > 0) {
113118
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());

common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,22 @@
3636
* Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
3737
*/
3838
public class NettyUtils {
39+
40+
/**
41+
* Specifies an upper bound on the number of Netty threads that Spark requires by default.
42+
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
43+
* that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
44+
* at a premium.
45+
*
46+
* Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
47+
* allocation. It can be overridden by setting the number of serverThreads and clientThreads
48+
* manually in Spark's configuration.
49+
*/
50+
private static int MAX_DEFAULT_NETTY_THREADS = 8;
51+
52+
private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
53+
new PooledByteBufAllocator[2];
54+
3955
/** Creates a new ThreadFactory which prefixes each thread with the given name. */
4056
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
4157
return new DefaultThreadFactory(threadPoolPrefix, true);
@@ -95,6 +111,38 @@ public static String getRemoteAddress(Channel channel) {
95111
return "<unknown remote>";
96112
}
97113

114+
/**
115+
* Returns the default number of threads for both the Netty client and server thread pools.
116+
* If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
117+
*/
118+
public static int defaultNumThreads(int numUsableCores) {
119+
final int availableCores;
120+
if (numUsableCores > 0) {
121+
availableCores = numUsableCores;
122+
} else {
123+
availableCores = Runtime.getRuntime().availableProcessors();
124+
}
125+
return Math.min(availableCores, MAX_DEFAULT_NETTY_THREADS);
126+
}
127+
128+
/**
129+
* Returns the lazily created shared pooled ByteBuf allocator for the specified allowCache
130+
* parameter value.
131+
*/
132+
public static synchronized PooledByteBufAllocator getSharedPooledByteBufAllocator(
133+
boolean allowDirectBufs,
134+
boolean allowCache) {
135+
final int index = allowCache ? 0 : 1;
136+
if (_sharedPooledByteBufAllocator[index] == null) {
137+
_sharedPooledByteBufAllocator[index] =
138+
createPooledByteBufAllocator(
139+
allowDirectBufs,
140+
allowCache,
141+
defaultNumThreads(0));
142+
}
143+
return _sharedPooledByteBufAllocator[index];
144+
}
145+
98146
/**
99147
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
100148
* are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,23 @@ public boolean saslServerAlwaysEncrypt() {
265265
return conf.getBoolean("spark.network.sasl.serverAlwaysEncrypt", false);
266266
}
267267

268+
/**
269+
* Flag indicating whether to share the pooled ByteBuf allocators between the different Netty
270+
* channels. If enabled then only two pooled ByteBuf allocators are created: one where caching
271+
* is allowed (for transport servers) and one where not (for transport clients).
272+
* When disabled a new allocator is created for each transport servers and clients.
273+
*/
274+
public boolean sharedByteBufAllocators() {
275+
return conf.getBoolean("spark.network.sharedByteBufAllocators.enabled", true);
276+
}
277+
278+
/**
279+
* If enabled then off-heap byte buffers will be prefered for the shared ByteBuf allocators.
280+
*/
281+
public boolean preferDirectBufsForSharedByteBufAllocators() {
282+
return conf.getBoolean("spark.network.io.preferDirectBufs", true);
283+
}
284+
268285
/**
269286
* The commons-crypto configuration for the module.
270287
*/
@@ -313,4 +330,5 @@ public int chunkFetchHandlerThreads() {
313330
this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors();
314331
return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0));
315332
}
333+
316334
}

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
<name>Spark Project Core</name>
3434
<url>http://spark.apache.org/</url>
3535
<dependencies>
36+
<dependency>
37+
<groupId>com.thoughtworks.paranamer</groupId>
38+
<artifactId>paranamer</artifactId>
39+
</dependency>
3640
<dependency>
3741
<groupId>org.apache.avro</groupId>
3842
<artifactId>avro</artifactId>

core/src/main/resources/org/apache/spark/ui/static/stagepage.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,10 @@ function getColumnNameForTaskMetricSummary(columnKey) {
100100
return "Scheduler Delay";
101101

102102
case "diskBytesSpilled":
103-
return "Shuffle spill (disk)";
103+
return "Spill (disk)";
104104

105105
case "memoryBytesSpilled":
106-
return "Shuffle spill (memory)";
106+
return "Spill (memory)";
107107

108108
case "shuffleReadMetrics":
109109
return "Shuffle Read Size / Records";
@@ -842,7 +842,7 @@ $(document).ready(function () {
842842
return "";
843843
}
844844
},
845-
name: "Shuffle Spill (Memory)"
845+
name: "Spill (Memory)"
846846
},
847847
{
848848
data : function (row, type) {
@@ -852,7 +852,7 @@ $(document).ready(function () {
852852
return "";
853853
}
854854
},
855-
name: "Shuffle Spill (Disk)"
855+
name: "Spill (Disk)"
856856
},
857857
{
858858
data : function (row, type) {

core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ <h4 class="title-table">Aggregated Metrics by Executor</h4>
5959
<th><span id="executor-summary-output">Output Size / Records</span></th>
6060
<th><span id="executor-summary-shuffle-read">Shuffle Read Size / Records</span></th>
6161
<th><span id="executor-summary-shuffle-write">Shuffle Write Size / Records</span></th>
62-
<th>Shuffle Spill (Memory) </th>
63-
<th>Shuffle Spill (Disk) </th>
62+
<th>Spill (Memory) </th>
63+
<th>Spill (Disk) </th>
6464
</thead>
6565
<tbody>
6666
</tbody>
@@ -111,8 +111,8 @@ <h4 id="tasksTitle" class="title-table"></h4>
111111
<th>Write Time</th>
112112
<th>Shuffle Write Size / Records</th>
113113
<th>Shuffle Read Size / Records</th>
114-
<th>Shuffle Spill (Memory)</th>
115-
<th>Shuffle Spill (Disk)</th>
114+
<th>Spill (Memory)</th>
115+
<th>Spill (Disk)</th>
116116
<th>Errors</th>
117117
</tr>
118118
</thead>

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2727

2828
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
30+
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
3031
import org.apache.spark.metrics.source.Source
3132
import org.apache.spark.scheduler._
3233
import org.apache.spark.storage.BlockManagerMaster
@@ -157,7 +158,7 @@ private[spark] class ExecutorAllocationManager(
157158

158159
// Polling loop interval (ms)
159160
private val intervalMillis: Long = if (Utils.isTesting) {
160-
conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
161+
conf.get(TEST_SCHEDULE_INTERVAL)
161162
} else {
162163
100
163164
}
@@ -899,5 +900,4 @@ private[spark] class ExecutorAllocationManager(
899900

900901
private object ExecutorAllocationManager {
901902
val NOT_SET = Long.MaxValue
902-
val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval"
903903
}

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
2929
import org.apache.spark.deploy.SparkHadoopUtil
3030
import org.apache.spark.internal.Logging
3131
import org.apache.spark.internal.config._
32+
import org.apache.spark.internal.config.UI._
3233
import org.apache.spark.launcher.SparkLauncher
3334
import org.apache.spark.network.sasl.SecretKeyHolder
3435
import org.apache.spark.util.Utils
@@ -56,17 +57,13 @@ private[spark] class SecurityManager(
5657
private val WILDCARD_ACL = "*"
5758

5859
private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
59-
// keep spark.ui.acls.enable for backwards compatibility with 1.0
60-
private var aclsOn =
61-
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
60+
private var aclsOn = sparkConf.get(ACLS_ENABLE)
6261

6362
// admin acls should be set before view or modify acls
64-
private var adminAcls: Set[String] =
65-
stringToSet(sparkConf.get("spark.admin.acls", ""))
63+
private var adminAcls: Set[String] = sparkConf.get(ADMIN_ACLS).toSet
6664

6765
// admin group acls should be set before view or modify group acls
68-
private var adminAclsGroups : Set[String] =
69-
stringToSet(sparkConf.get("spark.admin.acls.groups", ""))
66+
private var adminAclsGroups: Set[String] = sparkConf.get(ADMIN_ACLS_GROUPS).toSet
7067

7168
private var viewAcls: Set[String] = _
7269

@@ -82,11 +79,11 @@ private[spark] class SecurityManager(
8279
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
8380
Utils.getCurrentUserName())
8481

85-
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
86-
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))
82+
setViewAcls(defaultAclUsers, sparkConf.get(UI_VIEW_ACLS))
83+
setModifyAcls(defaultAclUsers, sparkConf.get(MODIFY_ACLS))
8784

88-
setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
89-
setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));
85+
setViewAclsGroups(sparkConf.get(UI_VIEW_ACLS_GROUPS))
86+
setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS))
9087

9188
private var secretKey: String = _
9289
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
@@ -127,32 +124,25 @@ private[spark] class SecurityManager(
127124
opts
128125
}
129126

130-
/**
131-
* Split a comma separated String, filter out any empty items, and return a Set of strings
132-
*/
133-
private def stringToSet(list: String): Set[String] = {
134-
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
135-
}
136-
137127
/**
138128
* Admin acls should be set before the view or modify acls. If you modify the admin
139129
* acls you should also set the view and modify acls again to pick up the changes.
140130
*/
141-
def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {
142-
viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
131+
def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
132+
viewAcls = adminAcls ++ defaultUsers ++ allowedUsers
143133
logInfo("Changing view acls to: " + viewAcls.mkString(","))
144134
}
145135

146-
def setViewAcls(defaultUser: String, allowedUsers: String) {
136+
def setViewAcls(defaultUser: String, allowedUsers: Seq[String]) {
147137
setViewAcls(Set[String](defaultUser), allowedUsers)
148138
}
149139

150140
/**
151141
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
152142
* acls groups you should also set the view and modify acls groups again to pick up the changes.
153143
*/
154-
def setViewAclsGroups(allowedUserGroups: String) {
155-
viewAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));
144+
def setViewAclsGroups(allowedUserGroups: Seq[String]) {
145+
viewAclsGroups = adminAclsGroups ++ allowedUserGroups
156146
logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(","))
157147
}
158148

@@ -179,17 +169,17 @@ private[spark] class SecurityManager(
179169
* Admin acls should be set before the view or modify acls. If you modify the admin
180170
* acls you should also set the view and modify acls again to pick up the changes.
181171
*/
182-
def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {
183-
modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
172+
def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
173+
modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers
184174
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
185175
}
186176

187177
/**
188178
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
189179
* acls groups you should also set the view and modify acls groups again to pick up the changes.
190180
*/
191-
def setModifyAclsGroups(allowedUserGroups: String) {
192-
modifyAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));
181+
def setModifyAclsGroups(allowedUserGroups: Seq[String]) {
182+
modifyAclsGroups = adminAclsGroups ++ allowedUserGroups
193183
logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(","))
194184
}
195185

@@ -216,17 +206,17 @@ private[spark] class SecurityManager(
216206
* Admin acls should be set before the view or modify acls. If you modify the admin
217207
* acls you should also set the view and modify acls again to pick up the changes.
218208
*/
219-
def setAdminAcls(adminUsers: String) {
220-
adminAcls = stringToSet(adminUsers)
209+
def setAdminAcls(adminUsers: Seq[String]) {
210+
adminAcls = adminUsers.toSet
221211
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
222212
}
223213

224214
/**
225215
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
226216
* acls groups you should also set the view and modify acls groups again to pick up the changes.
227217
*/
228-
def setAdminAclsGroups(adminUserGroups: String) {
229-
adminAclsGroups = stringToSet(adminUserGroups)
218+
def setAdminAclsGroups(adminUserGroups: Seq[String]) {
219+
adminAclsGroups = adminUserGroups.toSet
230220
logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(","))
231221
}
232222

@@ -416,7 +406,7 @@ private[spark] object SecurityManager {
416406

417407
val k8sRegex = "k8s.*".r
418408
val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key
419-
val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret"
409+
val SPARK_AUTH_SECRET_CONF = AUTH_SECRET.key
420410
// This is used to set auth secret to an executor's env variable. It should have the same
421411
// value as SPARK_AUTH_SECRET_CONF set in SparkConf
422412
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"

0 commit comments

Comments
 (0)