Skip to content

Commit d88b2ba

Browse files
avovchenkoekrivokonmapr
authored andcommitted
Read cluster configs from configMap (apache#275)
1 parent bedfde4 commit d88b2ba

File tree

4 files changed

+58
-1
lines changed

4 files changed

+58
-1
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,12 @@ private[spark] object Config extends Logging {
299299
.stringConf
300300
.createOptional
301301

302+
val MAPR_CLUSTER_CONFIGMAP =
303+
ConfigBuilder("spark.mapr.cluster.configMap")
304+
.doc("Name of the mapr cluster config map")
305+
.stringConf
306+
.createWithDefault("mapr-cluster-configmap")
307+
302308
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
303309
"spark.kubernetes.authenticate.submission"
304310

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,9 @@ private[spark] object Constants {
7878
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
7979
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
8080
val MEMORY_OVERHEAD_MIN_MIB = 384L
81+
82+
// Mapr specific Constants
83+
val CLDB_HOSTS = "MAPR_CLDB_HOSTS"
84+
val ZK_HOSTS = "MAPR_ZK_HOSTS"
85+
val CLUSTER_NAME = "MAPR_CLUSTER"
8186
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.steps
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
21+
import io.fabric8.kubernetes.api.model._
2222

2323
import org.apache.spark.{SparkConf, SparkException}
2424
import org.apache.spark.deploy.k8s.Config._
@@ -111,6 +111,8 @@ private[spark] class BasicDriverConfigurationStep(
111111
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
112112
}
113113

114+
val clusterConfMap = sparkConf.get(MAPR_CLUSTER_CONFIGMAP).toString
115+
114116
val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
115117
.withName(DRIVER_CONTAINER_NAME)
116118
.withImage(driverContainerImage)
@@ -136,6 +138,27 @@ private[spark] class BasicDriverConfigurationStep(
136138
.withNewFieldRef("v1", "status.podIP")
137139
.build())
138140
.endEnv()
141+
.addNewEnv()
142+
.withName(CLDB_HOSTS)
143+
.withNewValueFrom()
144+
.withConfigMapKeyRef(
145+
new ConfigMapKeySelector(CLDB_HOSTS, clusterConfMap, true))
146+
.endValueFrom()
147+
.endEnv()
148+
.addNewEnv()
149+
.withName(ZK_HOSTS)
150+
.withNewValueFrom()
151+
.withConfigMapKeyRef(
152+
new ConfigMapKeySelector(ZK_HOSTS, clusterConfMap, true))
153+
.endValueFrom()
154+
.endEnv()
155+
.addNewEnv()
156+
.withName(CLUSTER_NAME)
157+
.withNewValueFrom()
158+
.withConfigMapKeyRef(
159+
new ConfigMapKeySelector(CLUSTER_NAME, clusterConfMap, true))
160+
.endValueFrom()
161+
.endEnv()
139162
.withNewResources()
140163
.addToRequests("cpu", driverCpuQuantity)
141164
.addToRequests("memory", driverMemoryQuantity)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,34 @@ private[spark] class ExecutorPodFactory(
178178
.build()
179179
}
180180

181+
val clusterConfMap = sparkConf.get(MAPR_CLUSTER_CONFIGMAP).toString
182+
181183
val executorContainer = new ContainerBuilder()
182184
.withName("executor")
183185
.withImage(executorContainerImage)
184186
.withImagePullPolicy(imagePullPolicy)
185187
.addAllToEnv(clusterEnvs.asJava)
188+
.addNewEnv()
189+
.withName(CLDB_HOSTS)
190+
.withNewValueFrom()
191+
.withConfigMapKeyRef(
192+
new ConfigMapKeySelector(CLDB_HOSTS, clusterConfMap, false))
193+
.endValueFrom()
194+
.endEnv()
195+
.addNewEnv()
196+
.withName(ZK_HOSTS)
197+
.withNewValueFrom()
198+
.withConfigMapKeyRef(
199+
new ConfigMapKeySelector(ZK_HOSTS, clusterConfMap, false))
200+
.endValueFrom()
201+
.endEnv()
202+
.addNewEnv()
203+
.withName(CLUSTER_NAME)
204+
.withNewValueFrom()
205+
.withConfigMapKeyRef(
206+
new ConfigMapKeySelector(CLUSTER_NAME, clusterConfMap, false))
207+
.endValueFrom()
208+
.endEnv()
186209
.withNewResources()
187210
.addToRequests("memory", executorMemoryQuantity)
188211
.addToLimits("memory", executorMemoryLimitQuantity)

0 commit comments

Comments
 (0)