Skip to content

Commit f74303c

Browse files
Move the new logic into specialized classes. Add cleanup for old credentials files.
1 parent 2f9975c commit f74303c

File tree

7 files changed

+370
-228
lines changed

7 files changed

+370
-228
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy
18+
19+
import java.util.concurrent.{Executors, TimeUnit}
20+
import java.util.{Comparator, Arrays}
21+
22+
import com.google.common.primitives.Longs
23+
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.fs.{PathFilter, FileStatus, Path, FileSystem}
25+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
26+
27+
import org.apache.spark.util.Utils
28+
import org.apache.spark.{Logging, SparkConf}
29+
30+
private[spark] class ExecutorDelegationTokenUpdater(
31+
sparkConf: SparkConf,
32+
hadoopConf: Configuration) extends Logging {
33+
34+
@volatile private var lastCredentialsFileSuffix = 0
35+
36+
private lazy val delegationTokenRenewer =
37+
Executors.newSingleThreadScheduledExecutor(
38+
Utils.namedThreadFactory("Delegation Token Refresh Thread"))
39+
40+
// On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
41+
private lazy val executorUpdaterRunnable =
42+
new Runnable {
43+
override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
44+
}
45+
46+
def updateCredentialsIfRequired(): Unit = {
47+
try {
48+
sparkConf.getOption("spark.yarn.credentials.file").foreach { credentialsFile =>
49+
val credentials = UserGroupInformation.getCurrentUser.getCredentials
50+
val credentialsFilePath = new Path(credentialsFile)
51+
val remoteFs = FileSystem.get(hadoopConf)
52+
SparkHadoopUtil.get.listFilesSorted(
53+
remoteFs, credentialsFilePath.getParent, credentialsFilePath.getName, ".tmp")
54+
.lastOption.foreach { credentialsStatus =>
55+
val suffix = getSuffixForCredentialsPath(credentialsStatus)
56+
if (suffix > lastCredentialsFileSuffix) {
57+
logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
58+
val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
59+
lastCredentialsFileSuffix = suffix
60+
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
61+
val totalValidity = SparkHadoopUtil.get.getLatestTokenValidity(credentials) -
62+
credentialsStatus.getModificationTime
63+
val timeToRunRenewal =
64+
credentialsStatus.getModificationTime + (0.8 * totalValidity).toLong
65+
val timeFromNowToRenewal = timeToRunRenewal - System.currentTimeMillis()
66+
logInfo("Updated delegation tokens, will check for new tokens in " +
67+
timeFromNowToRenewal + " millis")
68+
delegationTokenRenewer.schedule(
69+
executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
70+
} else {
71+
// Check every hour to see if new credentials arrived.
72+
logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
73+
"tokens yet, will check again in an hour.")
74+
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
75+
}
76+
}
77+
}
78+
} catch {
79+
// Since the file may get deleted while we are reading it, catch the Exception and come
80+
// back in an hour to try again
81+
case e: Exception =>
82+
logWarning("Error while trying to update credentials, will try again in 1 hour", e)
83+
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
84+
}
85+
}
86+
87+
private def getCredentialsFromHDFSFile(
88+
remoteFs: FileSystem,
89+
tokenPath: Path): Credentials = {
90+
val stream = remoteFs.open(tokenPath)
91+
try {
92+
val newCredentials = new Credentials()
93+
newCredentials.readFields(stream)
94+
newCredentials
95+
} finally {
96+
stream.close()
97+
}
98+
}
99+
100+
def stop(): Unit = {
101+
delegationTokenRenewer.shutdown()
102+
}
103+
104+
private def getSuffixForCredentialsPath(credentialsStatus: FileStatus): Int = {
105+
val fileName = credentialsStatus.getPath.getName
106+
fileName.substring(fileName.lastIndexOf("-") + 1).toInt
107+
}
108+
}

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,24 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.io.{ByteArrayInputStream, DataInputStream}
2021
import java.lang.reflect.Method
2122
import java.security.PrivilegedExceptionAction
23+
import java.util.{Comparator, Arrays}
2224

25+
import com.google.common.primitives.Longs
2326
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
27+
import org.apache.hadoop.fs.{PathFilter, FileStatus, FileSystem, Path}
2528
import org.apache.hadoop.fs.FileSystem.Statistics
29+
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
2630
import org.apache.hadoop.mapred.JobConf
2731
import org.apache.hadoop.mapreduce.JobContext
2832
import org.apache.hadoop.security.Credentials
2933
import org.apache.hadoop.security.UserGroupInformation
3034

3135
import org.apache.spark._
3236
import org.apache.spark.annotation.DeveloperApi
33-
import org.apache.spark.util.{SerializableBuffer, Utils}
37+
import org.apache.spark.util.Utils
3438

3539
import scala.collection.JavaConversions._
3640

@@ -40,7 +44,7 @@ import scala.collection.JavaConversions._
4044
*/
4145
@DeveloperApi
4246
class SparkHadoopUtil extends Logging {
43-
protected val sparkConf = new SparkConf() // YarnSparkHadoopUtil requires this
47+
val sparkConf = new SparkConf()
4448
val conf: Configuration = newConfiguration(sparkConf)
4549
UserGroupInformation.setConfiguration(conf)
4650

@@ -55,16 +59,13 @@ class SparkHadoopUtil extends Logging {
5559
def runAsSparkUser(func: () => Unit) {
5660
val user = Utils.getCurrentUserName()
5761
logDebug("running as user: " + user)
58-
updateCredentialsIfRequired()
5962
val ugi = UserGroupInformation.createRemoteUser(user)
6063
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
6164
ugi.doAs(new PrivilegedExceptionAction[Unit] {
6265
def run: Unit = func()
6366
})
6467
}
6568

66-
def updateCredentialsIfRequired(): Unit = {}
67-
6869
def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
6970
for (token <- source.getTokens()) {
7071
dest.addToken(token)
@@ -125,14 +126,6 @@ class SparkHadoopUtil extends Logging {
125126
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
126127
}
127128

128-
/**
129-
* Schedule a login from the keytab and principal set using the --principal and --keytab
130-
* arguments to spark-submit. This login happens only when the credentials of the current user
131-
* are about to expire. This method reads SPARK_PRINCIPAL and SPARK_KEYTAB from the environment
132-
* to do the login. This method is a no-op in non-YARN mode.
133-
*/
134-
private[spark] def scheduleLoginFromKeytab(): Unit = {}
135-
136129
/**
137130
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
138131
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
@@ -213,6 +206,49 @@ class SparkHadoopUtil extends Logging {
213206
val baseStatus = fs.getFileStatus(basePath)
214207
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
215208
}
209+
210+
/**
211+
* Lists all the files in a directory with the specified prefix, and does not end with the
212+
* given suffix.
213+
* @param remoteFs
214+
* @param prefix
215+
* @return
216+
*/
217+
218+
def listFilesSorted(
219+
remoteFs: FileSystem,
220+
dir: Path,
221+
prefix: String,
222+
exclusionSuffix: String): Array[FileStatus] = {
223+
val fileStatuses = remoteFs.listStatus(dir,
224+
new PathFilter {
225+
override def accept(path: Path): Boolean = {
226+
val name = path.getName
227+
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
228+
}
229+
})
230+
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
231+
override def compare(o1: FileStatus, o2: FileStatus): Int = {
232+
Longs.compare(o1.getModificationTime, o2.getModificationTime)
233+
}
234+
})
235+
fileStatuses
236+
}
237+
238+
/**
239+
* Get the latest validity of the HDFS token in the Credentials object.
240+
* @param credentials
241+
* @return
242+
*/
243+
def getLatestTokenValidity(credentials: Credentials): Long = {
244+
credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
245+
.map { t =>
246+
val identifier = new DelegationTokenIdentifier()
247+
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
248+
identifier.getMaxDate
249+
}.foldLeft(0L)(math.max)
250+
}
251+
216252
}
217253

218254
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
2929

3030
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
3131
import org.apache.spark.TaskState.TaskState
32-
import org.apache.spark.deploy.SparkHadoopUtil
32+
import org.apache.spark.deploy.{ExecutorDelegationTokenUpdater, SparkHadoopUtil}
3333
import org.apache.spark.deploy.worker.WorkerWatcher
3434
import org.apache.spark.scheduler.TaskDescription
3535
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -155,6 +155,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
155155
driverConf.set(key, value)
156156
}
157157
}
158+
// Periodically update the credentials for this user to ensure HDFS tokens get updated.
159+
val tokenUpdater = new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf)
160+
tokenUpdater.updateCredentialsIfRequired()
158161
val env = SparkEnv.createExecutorEnv(
159162
driverConf, executorId, hostname, port, cores, isLocal = false)
160163

@@ -172,6 +175,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
172175
env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
173176
}
174177
env.actorSystem.awaitTermination()
178+
tokenUpdater.stop()
175179
}
176180
}
177181

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
241241
driverActor = actorSystem.actorOf(
242242
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
243243

244-
// If a principal and keytab have been set, use that to create new credentials for executors
245-
// periodically
246-
SparkHadoopUtil.get.scheduleLoginFromKeytab()
247244
}
248245

249246
def stopExecutors() {

0 commit comments

Comments
 (0)