Skip to content

Commit 1654a42

Browse files
avovchenkoekrivokonmapr
authored andcommitted
Run Spark job with specify the username for driver and executor (apache#283)
1 parent d88b2ba commit 1654a42

File tree

6 files changed

+130
-47
lines changed

6 files changed

+130
-47
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2421,8 +2421,33 @@ private[spark] object Utils extends Logging {
24212421
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
24222422
}
24232423

2424+
def getCurrentUserId(): String = {
2425+
val username = getCurrentUserName()
2426+
val cmdSeq = Seq("bash", "-c", "id -u " + username)
2427+
2428+
val userId = try {
2429+
executeAndGetOutput(cmdSeq).stripLineEnd
2430+
} catch {
2431+
case e: Exception => logError(s"Error getting user id for user=$username", e)
2432+
""
2433+
}
2434+
userId
2435+
}
2436+
24242437
val EMPTY_USER_GROUPS = Set.empty[String]
24252438

2439+
def getCurrentUserGroupsIds(sparkConf: SparkConf, username: String): Set[String] = {
2440+
val cmdSeq = Seq("bash", "-c", "id -G " + username)
2441+
2442+
try {
2443+
val userGroupsIds = executeAndGetOutput(cmdSeq).stripLineEnd.split(" ").toSet
2444+
return userGroupsIds
2445+
} catch {
2446+
case e: Exception => logError(s"Error getting groups ids for user=$username", e)
2447+
}
2448+
EMPTY_USER_GROUPS
2449+
}
2450+
24262451
// Returns the groups to which the current user belongs.
24272452
def getCurrentUserGroups(sparkConf: SparkConf, username: String): Set[String] = {
24282453
val groupProviderClassName = sparkConf.get("spark.user.groups.mapping",

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,12 @@ private[spark] object Config extends Logging {
305305
.stringConf
306306
.createWithDefault("mapr-cluster-configmap")
307307

308+
val MAPR_CLUSTER_USER_SECRETS =
309+
ConfigBuilder("spark.mapr.user.secrets")
310+
.doc("Name of the mapr user secrets")
311+
.stringConf
312+
.createWithDefault("mapr-user-secrets")
313+
308314
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
309315
"spark.kubernetes.authenticate.submission"
310316

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ private[spark] object Constants {
7979
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
8080
val MEMORY_OVERHEAD_MIN_MIB = 384L
8181

82-
// Mapr specific Constants
83-
val CLDB_HOSTS = "MAPR_CLDB_HOSTS"
84-
val ZK_HOSTS = "MAPR_ZK_HOSTS"
85-
val CLUSTER_NAME = "MAPR_CLUSTER"
82+
// User specific Constants
83+
val CURRENT_USER = "CURRENT_USER"
84+
val USER_GROUPS = "USER_GROUPS"
85+
val CURRENT_USER_ID = "USER_ID"
86+
val USER_GROUPS_IDS = "USER_GROUPS_IDS"
8687
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717
package org.apache.spark.deploy.k8s.submit.steps
1818

1919
import scala.collection.JavaConverters._
20-
2120
import io.fabric8.kubernetes.api.model._
22-
2321
import org.apache.spark.{SparkConf, SparkException}
2422
import org.apache.spark.deploy.k8s.Config._
2523
import org.apache.spark.deploy.k8s.Constants._
2624
import org.apache.spark.deploy.k8s.KubernetesUtils
2725
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
2826
import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
27+
import org.apache.spark.util.Utils
2928

3029
/**
3130
* Performs basic configuration for the driver pod.
@@ -112,6 +111,16 @@ private[spark] class BasicDriverConfigurationStep(
112111
}
113112

114113
val clusterConfMap = sparkConf.get(MAPR_CLUSTER_CONFIGMAP).toString
114+
val clusterUserSecrets = sparkConf.get(MAPR_CLUSTER_USER_SECRETS).toString
115+
116+
val username = Utils.getCurrentUserName()
117+
val userGroups = Utils.getCurrentUserGroups(sparkConf, username)
118+
val userId = Utils.getCurrentUserId()
119+
val userGroupsIds = Utils.getCurrentUserGroupsIds(sparkConf, username)
120+
121+
if (userId.length() == 0 || userGroupsIds.size == 0) {
122+
throw new RuntimeException(s"Error getting uid/gid for user=$username")
123+
}
115124

116125
val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
117126
.withName(DRIVER_CONTAINER_NAME)
@@ -120,6 +129,22 @@ private[spark] class BasicDriverConfigurationStep(
120129
.addAllToEnv(driverCustomEnvs.asJava)
121130
.addAllToEnv(clusterEnvs.asJava)
122131
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
132+
.addNewEnv()
133+
.withName(CURRENT_USER)
134+
.withValue(username)
135+
.endEnv()
136+
.addNewEnv()
137+
.withName(USER_GROUPS)
138+
.withValue(userGroups.mkString(" "))
139+
.endEnv()
140+
.addNewEnv()
141+
.withName(CURRENT_USER_ID)
142+
.withValue(userId)
143+
.endEnv()
144+
.addNewEnv()
145+
.withName(USER_GROUPS_IDS)
146+
.withValue(userGroupsIds.mkString(" "))
147+
.endEnv()
123148
.addNewEnv()
124149
.withName(ENV_DRIVER_MEMORY)
125150
.withValue(driverMemoryString)
@@ -138,27 +163,16 @@ private[spark] class BasicDriverConfigurationStep(
138163
.withNewFieldRef("v1", "status.podIP")
139164
.build())
140165
.endEnv()
141-
.addNewEnv()
142-
.withName(CLDB_HOSTS)
143-
.withNewValueFrom()
144-
.withConfigMapKeyRef(
145-
new ConfigMapKeySelector(CLDB_HOSTS, clusterConfMap, true))
146-
.endValueFrom()
147-
.endEnv()
148-
.addNewEnv()
149-
.withName(ZK_HOSTS)
150-
.withNewValueFrom()
151-
.withConfigMapKeyRef(
152-
new ConfigMapKeySelector(ZK_HOSTS, clusterConfMap, true))
153-
.endValueFrom()
154-
.endEnv()
155-
.addNewEnv()
156-
.withName(CLUSTER_NAME)
157-
.withNewValueFrom()
158-
.withConfigMapKeyRef(
159-
new ConfigMapKeySelector(CLUSTER_NAME, clusterConfMap, true))
160-
.endValueFrom()
161-
.endEnv()
166+
.addNewEnvFrom()
167+
.withNewConfigMapRef()
168+
.withName(clusterConfMap)
169+
.endConfigMapRef()
170+
.endEnvFrom()
171+
.addNewEnvFrom()
172+
.withNewSecretRef()
173+
.withName(clusterUserSecrets)
174+
.endSecretRef()
175+
.endEnvFrom()
162176
.withNewResources()
163177
.addToRequests("cpu", driverCpuQuantity)
164178
.addToRequests("memory", driverMemoryQuantity)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717
package org.apache.spark.scheduler.cluster.k8s
1818

1919
import scala.collection.JavaConverters._
20-
2120
import io.fabric8.kubernetes.api.model._
22-
23-
import org.apache.spark.{SparkConf, SparkException}
21+
import org.apache.spark.{SparkConf, SparkContext, SparkException}
2422
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, PodWithDetachedInitContainer}
2523
import org.apache.spark.deploy.k8s.Config._
2624
import org.apache.spark.deploy.k8s.Constants._
@@ -179,33 +177,48 @@ private[spark] class ExecutorPodFactory(
179177
}
180178

181179
val clusterConfMap = sparkConf.get(MAPR_CLUSTER_CONFIGMAP).toString
180+
val clusterUserSecrets = sparkConf.get(MAPR_CLUSTER_USER_SECRETS).toString
181+
182+
val username = Utils.getCurrentUserName()
183+
val userGroups = Utils.getCurrentUserGroups(sparkConf, username)
184+
val userId = Utils.getCurrentUserId()
185+
val userGroupsIds = Utils.getCurrentUserGroupsIds(sparkConf, username)
186+
187+
if (userId.length() == 0 || userGroupsIds.size == 0) {
188+
throw new RuntimeException(s"Error getting uid/gid for user=$username")
189+
}
182190

183191
val executorContainer = new ContainerBuilder()
184192
.withName("executor")
185193
.withImage(executorContainerImage)
186194
.withImagePullPolicy(imagePullPolicy)
187195
.addAllToEnv(clusterEnvs.asJava)
188196
.addNewEnv()
189-
.withName(CLDB_HOSTS)
190-
.withNewValueFrom()
191-
.withConfigMapKeyRef(
192-
new ConfigMapKeySelector(CLDB_HOSTS, clusterConfMap, false))
193-
.endValueFrom()
197+
.withName(CURRENT_USER)
198+
.withValue(username)
199+
.endEnv()
200+
.addNewEnv()
201+
.withName(USER_GROUPS)
202+
.withValue(userGroups.mkString(" "))
194203
.endEnv()
195204
.addNewEnv()
196-
.withName(ZK_HOSTS)
197-
.withNewValueFrom()
198-
.withConfigMapKeyRef(
199-
new ConfigMapKeySelector(ZK_HOSTS, clusterConfMap, false))
200-
.endValueFrom()
205+
.withName(CURRENT_USER_ID)
206+
.withValue(userId)
201207
.endEnv()
202208
.addNewEnv()
203-
.withName(CLUSTER_NAME)
204-
.withNewValueFrom()
205-
.withConfigMapKeyRef(
206-
new ConfigMapKeySelector(CLUSTER_NAME, clusterConfMap, false))
207-
.endValueFrom()
209+
.withName(USER_GROUPS_IDS)
210+
.withValue(userGroupsIds.mkString(" "))
208211
.endEnv()
212+
.addNewEnvFrom()
213+
.withNewConfigMapRef()
214+
.withName(clusterConfMap)
215+
.endConfigMapRef()
216+
.endEnvFrom()
217+
.addNewEnvFrom()
218+
.withNewSecretRef()
219+
.withName(clusterUserSecrets)
220+
.endSecretRef()
221+
.endEnvFrom()
209222
.withNewResources()
210223
.addToRequests("memory", executorMemoryQuantity)
211224
.addToLimits("memory", executorMemoryLimitQuantity)

resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,34 @@ case "$SPARK_K8S_CMD" in
139139
exit 1
140140
esac
141141

142+
function createUserGroups() {
143+
groups=($USER_GROUPS)
144+
groupIds=($USER_GROUPS_IDS)
145+
146+
for i in "${!groups[@]}"
147+
do
148+
groupadd -f -g ${groupIds[i]} ${groups[i]}
149+
usermod -a -G ${groups[i]} $CURRENT_USER
150+
done
151+
}
152+
153+
function createUser() {
154+
if ! id $CURRENT_USER >/dev/null 2>&1; then
155+
adduser -u $USER_ID $CURRENT_USER
156+
fi
157+
}
158+
142159
#Run configure.sh
143160
if [ ! $SPARK_K8S_CMD == "init" ]; then
144161
/opt/mapr/server/configure.sh -c -C $MAPR_CLDB_HOSTS -Z $MAPR_ZK_HOSTS -N $MAPR_CLUSTER
145-
fi
162+
createUser
163+
createUserGroups
164+
if [ $SPARK_K8S_CMD == "executor" ]; then
165+
chown $CURRENT_USER ./
166+
fi
146167

147-
# Execute the container CMD under tini for better hygiene
168+
exec sudo -u $CURRENT_USER "${CMD[@]}"
169+
else
170+
# Execute the container CMD
148171
exec "${CMD[@]}"
172+
fi

0 commit comments

Comments
 (0)