Skip to content

Commit be7e8da

Browse files
committed
Merge pull request alteryx#23 from jerryshao/multi-user
Add Spark multi-user support for standalone mode and Mesos This PR add multi-user support for Spark both standalone mode and Mesos (coarse and fine grained ) mode, user can specify the user name who submit app through environment variable `SPARK_USER` or use default one. Executor will communicate with Hadoop using specified user name. Also I fixed one bug in JobLogger when different user wrote job log to specified folder which has no right file permission. I separate previous [PR750](mesos/spark#750) into two PRs, in this PR I only solve multi-user support problem. I will try to solve security auth problem in subsequent PR because security auth is a complicated problem especially for Shark Server like long-run app (both Kerberos TGT and HDFS delegation token should be renewed or re-created through app's run time).
2 parents aadeda5 + 12dc385 commit be7e8da

File tree

5 files changed

+417
-379
lines changed

5 files changed

+417
-379
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,14 @@ class SparkContext(
145145
executorEnvs ++= environment
146146
}
147147

148+
// Set SPARK_USER for user who is running SparkContext.
149+
val sparkUser = Option {
150+
Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
151+
}.getOrElse {
152+
SparkContext.SPARK_UNKNOWN_USER
153+
}
154+
executorEnvs("SPARK_USER") = sparkUser
155+
148156
// Create and start the scheduler
149157
private[spark] var taskScheduler: TaskScheduler = {
150158
// Regular expression used for local[N] master format
@@ -981,6 +989,8 @@ object SparkContext {
981989

982990
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
983991

992+
private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
993+
984994
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
985995
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
986996
def zero(initialValue: Double) = 0.0

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.security.PrivilegedExceptionAction
21+
2022
import org.apache.hadoop.conf.Configuration
2123
import org.apache.hadoop.mapred.JobConf
24+
import org.apache.hadoop.security.UserGroupInformation
2225

2326
import org.apache.spark.SparkException
2427

@@ -27,6 +30,15 @@ import org.apache.spark.SparkException
2730
*/
2831
private[spark]
2932
class SparkHadoopUtil {
33+
val conf = newConfiguration()
34+
UserGroupInformation.setConfiguration(conf)
35+
36+
def runAsUser(user: String)(func: () => Unit) {
37+
val ugi = UserGroupInformation.createRemoteUser(user)
38+
ugi.doAs(new PrivilegedExceptionAction[Unit] {
39+
def run: Unit = func()
40+
})
41+
}
3042

3143
/**
3244
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
@@ -42,9 +54,9 @@ class SparkHadoopUtil {
4254

4355
def isYarnMode(): Boolean = { false }
4456
}
45-
57+
4658
object SparkHadoopUtil {
47-
private val hadoop = {
59+
private val hadoop = {
4860
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
4961
if (yarnMode) {
5062
try {
@@ -56,7 +68,7 @@ object SparkHadoopUtil {
5668
new SparkHadoopUtil
5769
}
5870
}
59-
71+
6072
def get: SparkHadoopUtil = {
6173
hadoop
6274
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ import java.util.concurrent._
2525
import scala.collection.JavaConversions._
2626
import scala.collection.mutable.HashMap
2727

28-
import org.apache.spark.scheduler._
2928
import org.apache.spark._
29+
import org.apache.spark.deploy.SparkHadoopUtil
30+
import org.apache.spark.scheduler._
3031
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
3132
import org.apache.spark.util.Utils
3233

@@ -129,6 +130,8 @@ private[spark] class Executor(
129130
// Maintains the list of running tasks.
130131
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
131132

133+
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
134+
132135
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
133136
val tr = new TaskRunner(context, taskId, serializedTask)
134137
runningTasks.put(taskId, tr)
@@ -176,7 +179,7 @@ private[spark] class Executor(
176179
}
177180
}
178181

179-
override def run() {
182+
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
180183
val startTime = System.currentTimeMillis()
181184
SparkEnv.set(env)
182185
Thread.currentThread.setContextClassLoader(replClassLoader)

0 commit comments

Comments
 (0)