Skip to content

Commit b5e65c5

Browse files
gaborgsomogyiJackey Lee
authored andcommitted
[SPARK-26766][CORE] Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens
## What changes were proposed in this pull request? Delegation token providers interface now has a parameter `fileSystems` but this is needed only for `HadoopFSDelegationTokenProvider`. In this PR I've addressed this issue in the following way: * Removed `fileSystems` parameter from `HadoopDelegationTokenProvider` * Moved `YarnSparkHadoopUtil.hadoopFSsToAccess` into `HadoopFSDelegationTokenProvider` * Moved `spark.yarn.stagingDir` into core * Moved `spark.yarn.access.namenodes` into core and renamed to `spark.kerberos.access.namenodes` * Moved `spark.yarn.access.hadoopFileSystems` into core and renamed to `spark.kerberos.access.hadoopFileSystems` ## How was this patch tested? Existing unit tests. Closes apache#23698 from gaborgsomogyi/SPARK-26766. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 049fbfd commit b5e65c5

File tree

18 files changed

+195
-171
lines changed

18 files changed

+195
-171
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -700,8 +700,6 @@ private[spark] object SparkConf extends Logging {
700700
AlternateConfig("spark.akka.frameSize", "1.6")),
701701
"spark.yarn.jars" -> Seq(
702702
AlternateConfig("spark.yarn.jar", "2.0")),
703-
"spark.yarn.access.hadoopFileSystems" -> Seq(
704-
AlternateConfig("spark.yarn.access.namenodes", "2.2")),
705703
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
706704
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
707705
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
@@ -715,7 +713,10 @@ private[spark] object SparkConf extends Logging {
715713
PRINCIPAL.key -> Seq(
716714
AlternateConfig("spark.yarn.principal", "3.0")),
717715
KERBEROS_RELOGIN_PERIOD.key -> Seq(
718-
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0"))
716+
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")),
717+
KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq(
718+
AlternateConfig("spark.yarn.access.namenodes", "2.2"),
719+
AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0"))
719720
)
720721

721722
/**

core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ private[security] class HBaseDelegationTokenProvider
3939
override def obtainDelegationTokens(
4040
hadoopConf: Configuration,
4141
sparkConf: SparkConf,
42-
fileSystems: Set[FileSystem],
4342
creds: Credentials): Option[Long] = {
4443
try {
4544
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private[spark] class HadoopDelegationTokenManager(
144144
def obtainDelegationTokens(creds: Credentials): Long = {
145145
delegationTokenProviders.values.flatMap { provider =>
146146
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
147-
provider.obtainDelegationTokens(hadoopConf, sparkConf, fileSystemsToAccess(), creds)
147+
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
148148
} else {
149149
logDebug(s"Service ${provider.serviceName} does not require a token." +
150150
s" Check your configuration to see if security is disabled or not.")
@@ -181,14 +181,6 @@ private[spark] class HadoopDelegationTokenManager(
181181
.getOrElse(isEnabledDeprecated)
182182
}
183183

184-
/**
185-
* List of file systems for which to obtain delegation tokens. The base implementation
186-
* returns just the default file system in the given Hadoop configuration.
187-
*/
188-
protected def fileSystemsToAccess(): Set[FileSystem] = {
189-
Set(FileSystem.get(hadoopConf))
190-
}
191-
192184
private def scheduleRenewal(delay: Long): Unit = {
193185
val _delay = math.max(0, delay)
194186
logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,11 @@ private[spark] trait HadoopDelegationTokenProvider {
4444
* Obtain delegation tokens for this service and get the time of the next renewal.
4545
* @param hadoopConf Configuration of current Hadoop Compatible system.
4646
* @param creds Credentials to add tokens and security keys to.
47-
* @param fileSystems List of file systems for which to obtain delegation tokens.
4847
* @return If the returned tokens are renewable and can be renewed, return the time of the next
4948
* renewal, otherwise None should be returned.
5049
*/
5150
def obtainDelegationTokens(
5251
hadoopConf: Configuration,
5352
sparkConf: SparkConf,
54-
fileSystems: Set[FileSystem],
5553
creds: Credentials): Option[Long]
5654
}

core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.util.Try
2222
import scala.util.control.NonFatal
2323

2424
import org.apache.hadoop.conf.Configuration
25-
import org.apache.hadoop.fs.FileSystem
25+
import org.apache.hadoop.fs.{FileSystem, Path}
2626
import org.apache.hadoop.mapred.Master
2727
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
2828
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
@@ -44,9 +44,9 @@ private[deploy] class HadoopFSDelegationTokenProvider
4444
override def obtainDelegationTokens(
4545
hadoopConf: Configuration,
4646
sparkConf: SparkConf,
47-
fileSystems: Set[FileSystem],
4847
creds: Credentials): Option[Long] = {
4948
try {
49+
val fileSystems = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
5050
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds)
5151

5252
// Get the token renewal interval if it is not set. It will only be called once.
@@ -133,3 +133,44 @@ private[deploy] class HadoopFSDelegationTokenProvider
133133
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
134134
}
135135
}
136+
137+
private[deploy] object HadoopFSDelegationTokenProvider {
138+
def hadoopFSsToAccess(
139+
sparkConf: SparkConf,
140+
hadoopConf: Configuration): Set[FileSystem] = {
141+
val filesystemsToAccess = sparkConf.get(KERBEROS_FILESYSTEMS_TO_ACCESS)
142+
143+
val defaultFS = FileSystem.get(hadoopConf)
144+
val master = sparkConf.get("spark.master", null)
145+
val stagingFS = if (master != null && master.contains("yarn")) {
146+
sparkConf.get(STAGING_DIR).map(new Path(_).getFileSystem(hadoopConf))
147+
} else {
148+
None
149+
}
150+
151+
// Add the list of available namenodes for all namespaces in HDFS federation.
152+
// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
153+
// namespaces.
154+
val hadoopFilesystems = if (!filesystemsToAccess.isEmpty || defaultFS.getScheme == "viewfs" ||
155+
(stagingFS.isDefined && stagingFS.get.getScheme == "viewfs")) {
156+
filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet
157+
} else {
158+
val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
159+
// Retrieving the filesystem for the nameservices where HA is not enabled
160+
val filesystemsWithoutHA = nameservices.flatMap { ns =>
161+
Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode =>
162+
new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)
163+
}
164+
}
165+
// Retrieving the filesystem for the nameservices where HA is enabled
166+
val filesystemsWithHA = nameservices.flatMap { ns =>
167+
Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ =>
168+
new Path(s"hdfs://$ns").getFileSystem(hadoopConf)
169+
}
170+
}
171+
(filesystemsWithoutHA ++ filesystemsWithHA).toSet
172+
}
173+
174+
hadoopFilesystems ++ stagingFS + defaultFS
175+
}
176+
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,14 @@ package object config {
364364
.checkValues(Set("keytab", "ccache"))
365365
.createWithDefault("keytab")
366366

367+
private[spark] val KERBEROS_FILESYSTEMS_TO_ACCESS =
368+
ConfigBuilder("spark.kerberos.access.hadoopFileSystems")
369+
.doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " +
370+
"that hosts fs.defaultFS does not need to be listed here.")
371+
.stringConf
372+
.toSequence
373+
.createWithDefault(Nil)
374+
367375
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
368376
.intConf
369377
.createOptional
@@ -1253,4 +1261,9 @@ package object config {
12531261
ConfigBuilder("spark.speculation.quantile")
12541262
.doubleConf
12551263
.createWithDefault(0.75)
1264+
1265+
private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
1266+
.doc("Staging directory used while submitting applications.")
1267+
.stringConf
1268+
.createOptional
12561269
}

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
264264

265265
conf.set("spark.scheduler.listenerbus.eventqueue.size", "84")
266266
assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84)
267+
268+
conf.set("spark.yarn.access.namenodes", "testNode")
269+
assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode"))
270+
271+
conf.set("spark.yarn.access.hadoopFileSystems", "testNode")
272+
assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode"))
267273
}
268274

269275
test("akka deprecated configs") {

core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.deploy.security
1919

2020
import org.apache.hadoop.conf.Configuration
21-
import org.apache.hadoop.fs.FileSystem
2221
import org.apache.hadoop.security.Credentials
2322

2423
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -36,7 +35,6 @@ private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationT
3635
override def obtainDelegationTokens(
3736
hadoopConf: Configuration,
3837
sparkConf: SparkConf,
39-
fileSystems: Set[FileSystem],
4038
creds: Credentials): Option[Long] = throw new IllegalArgumentException
4139
}
4240

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.security
19+
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.fs.Path
22+
import org.scalatest.Matchers
23+
24+
import org.apache.spark.{SparkConf, SparkFunSuite}
25+
import org.apache.spark.internal.config.STAGING_DIR
26+
27+
class HadoopFSDelegationTokenProviderSuite extends SparkFunSuite with Matchers {
28+
test("hadoopFSsToAccess should return defaultFS even if not configured") {
29+
val sparkConf = new SparkConf()
30+
val defaultFS = "hdfs://localhost:8020"
31+
val statingDir = "hdfs://localhost:8021"
32+
sparkConf.set("spark.master", "yarn-client")
33+
sparkConf.set(STAGING_DIR, statingDir)
34+
val hadoopConf = new Configuration()
35+
hadoopConf.set("fs.defaultFS", defaultFS)
36+
val expected = Set(
37+
new Path(defaultFS).getFileSystem(hadoopConf),
38+
new Path(statingDir).getFileSystem(hadoopConf)
39+
)
40+
val result = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
41+
result should be (expected)
42+
}
43+
44+
test("SPARK-24149: retrieve all namenodes from HDFS") {
45+
val sparkConf = new SparkConf()
46+
sparkConf.set("spark.master", "yarn-client")
47+
val basicFederationConf = new Configuration()
48+
basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
49+
basicFederationConf.set("dfs.nameservices", "ns1,ns2")
50+
basicFederationConf.set("dfs.namenode.rpc-address.ns1", "localhost:8020")
51+
basicFederationConf.set("dfs.namenode.rpc-address.ns2", "localhost:8021")
52+
val basicFederationExpected = Set(
53+
new Path("hdfs://localhost:8020").getFileSystem(basicFederationConf),
54+
new Path("hdfs://localhost:8021").getFileSystem(basicFederationConf))
55+
val basicFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
56+
sparkConf, basicFederationConf)
57+
basicFederationResult should be (basicFederationExpected)
58+
59+
// when viewfs is enabled, namespaces are handled by it, so we don't need to take care of them
60+
val viewFsConf = new Configuration()
61+
viewFsConf.addResource(basicFederationConf)
62+
viewFsConf.set("fs.defaultFS", "viewfs://clusterX/")
63+
viewFsConf.set("fs.viewfs.mounttable.clusterX.link./home", "hdfs://localhost:8020/")
64+
val viewFsExpected = Set(new Path("viewfs://clusterX/").getFileSystem(viewFsConf))
65+
HadoopFSDelegationTokenProvider
66+
.hadoopFSsToAccess(sparkConf, viewFsConf) should be (viewFsExpected)
67+
68+
// invalid config should not throw NullPointerException
69+
val invalidFederationConf = new Configuration()
70+
invalidFederationConf.addResource(basicFederationConf)
71+
invalidFederationConf.unset("dfs.namenode.rpc-address.ns2")
72+
val invalidFederationExpected = Set(
73+
new Path("hdfs://localhost:8020").getFileSystem(invalidFederationConf))
74+
val invalidFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
75+
sparkConf, invalidFederationConf)
76+
invalidFederationResult should be (invalidFederationExpected)
77+
78+
// no namespaces defined, ie. old case
79+
val noFederationConf = new Configuration()
80+
noFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
81+
val noFederationExpected = Set(
82+
new Path("hdfs://localhost:8020").getFileSystem(noFederationConf))
83+
val noFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf,
84+
noFederationConf)
85+
noFederationResult should be (noFederationExpected)
86+
87+
// federation and HA enabled
88+
val federationAndHAConf = new Configuration()
89+
federationAndHAConf.set("fs.defaultFS", "hdfs://clusterXHA")
90+
federationAndHAConf.set("dfs.nameservices", "clusterXHA,clusterYHA")
91+
federationAndHAConf.set("dfs.ha.namenodes.clusterXHA", "x-nn1,x-nn2")
92+
federationAndHAConf.set("dfs.ha.namenodes.clusterYHA", "y-nn1,y-nn2")
93+
federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn1", "localhost:8020")
94+
federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn2", "localhost:8021")
95+
federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn1", "localhost:8022")
96+
federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn2", "localhost:8023")
97+
federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterXHA",
98+
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
99+
federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterYHA",
100+
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
101+
102+
val federationAndHAExpected = Set(
103+
new Path("hdfs://clusterXHA").getFileSystem(federationAndHAConf),
104+
new Path("hdfs://clusterYHA").getFileSystem(federationAndHAConf))
105+
val federationAndHAResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
106+
sparkConf, federationAndHAConf)
107+
federationAndHAResult should be (federationAndHAExpected)
108+
}
109+
}

docs/running-on-yarn.md

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -520,10 +520,6 @@ for:
520520
filesystem if `spark.yarn.stagingDir` is not set);
521521
- if Hadoop federation is enabled, all the federated filesystems in the configuration.
522522

523-
If an application needs to interact with other secure Hadoop filesystems, their URIs need to be
524-
explicitly provided to Spark at launch time. This is done by listing them in the
525-
`spark.yarn.access.hadoopFileSystems` property, described in the configuration section below.
526-
527523
The YARN integration also supports custom delegation token providers using the Java Services
528524
mechanism (see `java.util.ServiceLoader`). Implementations of
529525
`org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` can be made available to Spark
@@ -557,18 +553,6 @@ providers can be disabled individually by setting `spark.security.credentials.{s
557553
<br /> (Works also with the "local" master.)
558554
</td>
559555
</tr>
560-
<tr>
561-
<td><code>spark.yarn.access.hadoopFileSystems</code></td>
562-
<td>(none)</td>
563-
<td>
564-
A comma-separated list of secure Hadoop filesystems your Spark application is going to access. For
565-
example, <code>spark.yarn.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
566-
webhdfs://nn3.com:50070</code>. The Spark application must have access to the filesystems listed
567-
and Kerberos must be properly configured to be able to access them (either in the same realm
568-
or in a trusted realm). Spark acquires security tokens for each of the filesystems so that
569-
the Spark application can access those remote Hadoop filesystems.
570-
</td>
571-
</tr>
572556
<tr>
573557
<td><code>spark.yarn.kerberos.relogin.period</code></td>
574558
<td>1m</td>
@@ -674,7 +658,7 @@ spark.security.credentials.hive.enabled false
674658
spark.security.credentials.hbase.enabled false
675659
```
676660

677-
The configuration option `spark.yarn.access.hadoopFileSystems` must be unset.
661+
The configuration option `spark.kerberos.access.hadoopFileSystems` must be unset.
678662

679663
# Using the Spark History Server to replace the Spark Web UI
680664

0 commit comments

Comments
 (0)